diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 934515e..591f3e9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -20,11 +20,7 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Vector; +import java.util.*; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; @@ -183,6 +179,9 @@ // Timeline domain writer access control private String modifyACLs = null; + private String flowId = null; + private String flowRunId = null; + // Command line options private Options opts; @@ -256,7 +255,8 @@ public Client(Configuration conf) throws Exception { opts.addOption("shell_args", true, "Command line args for the shell script." + "Multiple args can be separated by empty space."); opts.getOption("shell_args").setArgs(Option.UNLIMITED_VALUES); - opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs"); + opts.addOption("shell_env", true, + "Environment for shell script. Specified as env_key=env_val pairs"); opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers"); opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command"); @@ -283,6 +283,10 @@ public Client(Configuration conf) throws Exception { + "modify the timeline entities in the given domain"); opts.addOption("create", false, "Flag to indicate whether to create the " + "domain specified with -domain."); + opts.addOption("flow", true, "ID of the flow which the distributed shell " + + "app belongs to"); + opts.addOption("flow_run", true, "ID of the flowrun which the distributed " + + "shell app belongs to"); opts.addOption("help", false, "Print usage"); opts.addOption("node_label_expression", true, "Node label expression to determine the nodes" @@ -442,6 +446,12 @@ public boolean init(String[] args) throws ParseException { } } + if (cliParser.hasOption("flow")) { + flowId = cliParser.getOptionValue("flow"); + } + if (cliParser.hasOption("flow_run")) { + flowRunId = cliParser.getOptionValue("flow_run"); + } return true; } @@ -533,6 +543,15 @@ public boolean run() throws IOException, YarnException { .setAttemptFailuresValidityInterval(attemptFailuresValidityInterval); } + Set tags = new HashSet(); + if (flowId != null) { + tags.add(TimelineUtils.generateFlowIdTag(flowId)); + } + if (flowRunId != null) { + tags.add(TimelineUtils.generateFlowRunIdTag(flowRunId)); + } + appContext.setApplicationTags(tags); + // set local resources for the application master // local files or archives as needed // In this scenario, the jar file for the application master is part of the local resources diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 0af050c..3864cc6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -33,12 +33,14 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -52,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -208,7 +211,11 @@ public void testDSShell(boolean haveDomain, String timelineVersion) if (timelineVersion.equalsIgnoreCase("v2")) { String[] timelineArgs = { "--timeline_service_version", - "v2" + "v2", + "--flow", + "test_flow_id", + "--flow_run", + "12345678" }; isTestingTimelineV2 = true; args = mergeArgs(args, timelineArgs); @@ -320,53 +327,51 @@ private void checkTimelineV1(boolean haveDomain) throws Exception { } } - private void checkTimelineV2(boolean haveDomain, ApplicationId appId) { - // For PoC check in /tmp/ YARN-3264 - String tmpRoot = FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT; + private void checkTimelineV2( + boolean haveDomain, ApplicationId appId) throws Exception { + // For PoC check in /tmp/timeline_service_data YARN-3264 + String tmpRoot = + FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT + + "/entities/"; File tmpRootFolder = new File(tmpRoot); - Assert.assertTrue(tmpRootFolder.isDirectory()); - - // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs - String outputDirApp = tmpRoot + "/DS_APP_ATTEMPT/"; - - File entityFolder = new File(outputDirApp); - Assert.assertTrue(entityFolder.isDirectory()); - - // there will be at least one attempt, look for that file - String appTimestampFileName = "appattempt_" + appId.getClusterTimestamp() - + "_000" + appId.getId() + "_000001" - + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; - String appAttemptFileName = outputDirApp + appTimestampFileName; - File appAttemptFile = new File(appAttemptFileName); - Assert.assertTrue(appAttemptFile.exists()); - - String outputDirContainer = tmpRoot + "/DS_CONTAINER/"; - File containerFolder = new File(outputDirContainer); - Assert.assertTrue(containerFolder.isDirectory()); - - String containerTimestampFileName = "container_" - + appId.getClusterTimestamp() + "_000" + appId.getId() - + "_01_000002.thist"; - String containerFileName = outputDirContainer + containerTimestampFileName; - File containerFile = new File(containerFileName); - Assert.assertTrue(containerFile.exists()); - String appTimeStamp = appId.getClusterTimestamp() + "_" + appId.getId() - + "_"; - deleteAppFiles(new File(outputDirApp), appTimeStamp); - deleteAppFiles(new File(outputDirContainer), appTimeStamp); - tmpRootFolder.delete(); - } - - private void deleteAppFiles(File rootDir, String appTimeStamp) { - boolean deleted = false; - File[] listOfFiles = rootDir.listFiles(); - for (File f1 : listOfFiles) { - // list all attempts for this app and delete them - if (f1.getName().contains(appTimeStamp)){ - deleted = f1.delete(); - Assert.assertTrue(deleted); - } + try { + Assert.assertTrue(tmpRootFolder.isDirectory()); + + // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs + String outputDirApp = tmpRoot + + TimelineUtils.generateDefaultClusterIdBasedOnAppId(appId) + "/" + + UserGroupInformation.getCurrentUser().getShortUserName() + + "/test_flow_id/12345678/" + appId.toString() + "/DS_APP_ATTEMPT/"; + + File entityFolder = new File(outputDirApp); + Assert.assertTrue(entityFolder.isDirectory()); + + // there will be at least one attempt, look for that file + String appTimestampFileName = "appattempt_" + appId.getClusterTimestamp() + + "_000" + appId.getId() + "_000001" + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + String appAttemptFileName = outputDirApp + appTimestampFileName; + File appAttemptFile = new File(appAttemptFileName); + Assert.assertTrue(appAttemptFile.exists()); + + String outputDirContainer = tmpRoot + + TimelineUtils.generateDefaultClusterIdBasedOnAppId(appId) + "/" + + UserGroupInformation.getCurrentUser().getShortUserName() + + "/test_flow_id/12345678/" + appId.toString() + "/DS_CONTAINER/"; + File containerFolder = new File(outputDirContainer); + Assert.assertTrue(containerFolder.isDirectory()); + + String containerTimestampFileName = "container_" + + appId.getClusterTimestamp() + "_000" + appId.getId() + + "_01_000002.thist"; + String containerFileName = outputDirContainer + containerTimestampFileName; + File containerFile = new File(containerFileName); + Assert.assertTrue(containerFile.exists()); + String appTimeStamp = appId.getClusterTimestamp() + "_" + appId.getId() + + "_"; + } finally { + FileUtils.deleteDirectory(tmpRootFolder.getParentFile()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java index 02b5eb4..842721e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import org.codehaus.jackson.JsonGenerationException; @@ -40,6 +41,9 @@ @Evolving public class TimelineUtils { + public static final String FLOW_ID_TAG_PREFIX = "TIMELINE_FLOW_ID_TAG"; + public static final String FLOW_RUN_ID_TAG_PREFIX = "TIMELINE_FLOW_RUN_ID_TAG"; + private static ObjectMapper mapper; static { @@ -105,4 +109,21 @@ public static Text buildTimelineTokenService(Configuration conf) { getTimelineTokenServiceAddress(conf); return SecurityUtil.buildTokenService(timelineServiceAddr); } + + public static String generateDefaultFlowIdBasedOnAppId(ApplicationId appId) { + return "flow_" + appId.getClusterTimestamp() + "_" + appId.getId(); + } + + public static String generateDefaultClusterIdBasedOnAppId( + ApplicationId appId) { + return "cluster_" + appId.getClusterTimestamp(); + } + + public static String generateFlowIdTag(String flowId) { + return FLOW_ID_TAG_PREFIX + ":" + flowId; + } + + public static String generateFlowRunIdTag(String flowRunId) { + return FLOW_RUN_ID_TAG_PREFIX + ":" + flowRunId; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java index 26c121a..a95f8b2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java @@ -21,6 +21,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; @@ -54,4 +56,18 @@ ReportNewCollectorInfoResponse reportNewCollectorInfo( ReportNewCollectorInfoRequest request) throws YarnException, IOException; + /** + *

+ * The aggregator needs to get the context information including user, flow + * and flow run ID to associate with every incoming put-entity requests. + *

+ * @param request the request of getting the aggregator context information of + * the given application + * @return + * @throws YarnException + * @throws IOException + */ + GetTimelineCollectorContextResponse getTimelineCollectorContext( + GetTimelineCollectorContextRequest request) + throws YarnException, IOException; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java index 276a540..ace86c9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java @@ -30,11 +30,16 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto; import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocolPB; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetTimelineCollectorContextRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetTimelineCollectorContextResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoResponsePBImpl; @@ -85,6 +90,21 @@ public ReportNewCollectorInfoResponse reportNewCollectorInfo( } @Override + public GetTimelineCollectorContextResponse getTimelineCollectorContext( + GetTimelineCollectorContextRequest request) + throws YarnException, IOException { + GetTimelineCollectorContextRequestProto requestProto = + ((GetTimelineCollectorContextRequestPBImpl) request).getProto(); + try { + return new GetTimelineCollectorContextResponsePBImpl( + proxy.getTimelienCollectorContext(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override public void close() { if (this.proxy != null) { RPC.stopProxy(this.proxy); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java index 3f42732..55a0169 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java @@ -20,11 +20,16 @@ import java.io.IOException; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextResponseProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoResponseProto; import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocolPB; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetTimelineCollectorContextRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetTimelineCollectorContextResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoResponsePBImpl; @@ -56,4 +61,20 @@ public ReportNewCollectorInfoResponseProto reportNewCollectorInfo( } } + @Override + public GetTimelineCollectorContextResponseProto getTimelienCollectorContext( + RpcController controller, + GetTimelineCollectorContextRequestProto proto) throws ServiceException { + GetTimelineCollectorContextRequestPBImpl request = + new GetTimelineCollectorContextRequestPBImpl(proto); + try { + GetTimelineCollectorContextResponse response = + real.getTimelineCollectorContext(request); + return ((GetTimelineCollectorContextResponsePBImpl)response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextRequest.java new file mode 100644 index 0000000..604a40b --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextRequest.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords; + + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.util.Records; + +public abstract class GetTimelineCollectorContextRequest { + + public static GetTimelineCollectorContextRequest newInstance( + ApplicationId appId) { + GetTimelineCollectorContextRequest request = + Records.newRecord(GetTimelineCollectorContextRequest.class); + request.setApplicationId(appId); + return request; + } + + public abstract ApplicationId getApplicationId(); + + public abstract void setApplicationId(ApplicationId appId); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java new file mode 100644 index 0000000..1558e2f --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords; + + +import org.apache.hadoop.yarn.util.Records; + +public abstract class GetTimelineCollectorContextResponse { + + public static GetTimelineCollectorContextResponse newInstance( + String userId, String flowId, String flowRunId) { + GetTimelineCollectorContextResponse response = + Records.newRecord(GetTimelineCollectorContextResponse.class); + response.setUserId(userId); + response.setFlowId(flowId); + response.setFlowRunId(flowRunId); + return response; + } + + public abstract String getUserId(); + + public abstract void setUserId(String userId); + + public abstract String getFlowId(); + + public abstract void setFlowId(String flowId); + + public abstract String getFlowRunId(); + + public abstract void setFlowRunId(String flowRunId); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextRequestPBImpl.java new file mode 100644 index 0000000..b53b55b --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextRequestPBImpl.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + + +import com.google.protobuf.TextFormat; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextRequestProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextRequestProto; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; + +public class GetTimelineCollectorContextRequestPBImpl extends + GetTimelineCollectorContextRequest { + + GetTimelineCollectorContextRequestProto + proto = GetTimelineCollectorContextRequestProto.getDefaultInstance(); + GetTimelineCollectorContextRequestProto.Builder builder = null; + boolean viaProto = false; + + private ApplicationId appId = null; + + public GetTimelineCollectorContextRequestPBImpl() { + builder = GetTimelineCollectorContextRequestProto.newBuilder(); + } + + public GetTimelineCollectorContextRequestPBImpl( + GetTimelineCollectorContextRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public GetTimelineCollectorContextRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + if (appId != null) { + builder.setAppId(convertToProtoFormat(this.appId)); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = GetTimelineCollectorContextRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public ApplicationId getApplicationId() { + if (this.appId != null) { + return this.appId; + } + + GetTimelineCollectorContextRequestProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasAppId()) { + return null; + } + + this.appId = convertFromProtoFormat(p.getAppId()); + return this.appId; + } + + @Override + public void setApplicationId(ApplicationId appId) { + maybeInitBuilder(); + if (appId == null) + builder.clearAppId(); + this.appId = appId; + } + + private ApplicationIdPBImpl convertFromProtoFormat(YarnProtos.ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private YarnProtos.ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl)t).getProto(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java new file mode 100644 index 0000000..6dc1f77 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + + +import com.google.protobuf.TextFormat; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextResponseProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextResponseProto; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; + +public class GetTimelineCollectorContextResponsePBImpl extends + GetTimelineCollectorContextResponse { + + GetTimelineCollectorContextResponseProto proto = + GetTimelineCollectorContextResponseProto.getDefaultInstance(); + GetTimelineCollectorContextResponseProto.Builder builder = null; + boolean viaProto = false; + + public GetTimelineCollectorContextResponsePBImpl() { + builder = GetTimelineCollectorContextResponseProto.newBuilder(); + } + + public GetTimelineCollectorContextResponsePBImpl( + GetTimelineCollectorContextResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public GetTimelineCollectorContextResponseProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = GetTimelineCollectorContextResponseProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public String getUserId() { + GetTimelineCollectorContextResponseProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasUserId()) { + return null; + } + return p.getUserId(); + } + + @Override + public void setUserId(String userId) { + maybeInitBuilder(); + if (userId == null) { + builder.clearUserId(); + return; + } + builder.setUserId(userId); + } + + @Override + public String getFlowId() { + GetTimelineCollectorContextResponseProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasFlowId()) { + return null; + } + return p.getFlowId(); + } + + @Override + public void setFlowId(String flowId) { + maybeInitBuilder(); + if (flowId == null) { + builder.clearFlowId(); + return; + } + builder.setFlowId(flowId); + } + + @Override + public String getFlowRunId() { + GetTimelineCollectorContextResponseProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasFlowRunId()) { + return null; + } + return p.getFlowRunId(); + } + + @Override + public void setFlowRunId(String flowRunId) { + maybeInitBuilder(); + if (flowRunId == null) { + builder.clearFlowRunId(); + return; + } + builder.setFlowRunId(flowRunId); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto index 654a9f2..dfb2ed7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto @@ -26,4 +26,5 @@ import "yarn_server_common_service_protos.proto"; service CollectorNodemanagerProtocolService { rpc reportNewCollectorInfo (ReportNewCollectorInfoRequestProto) returns (ReportNewCollectorInfoResponseProto); + rpc getTimelienCollectorContext (GetTimelineCollectorContextRequestProto) returns (GetTimelineCollectorContextResponseProto); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 0086bae..f733371 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -87,6 +87,15 @@ message ReportNewCollectorInfoRequestProto { message ReportNewCollectorInfoResponseProto { } +message GetTimelineCollectorContextRequestProto { + optional ApplicationIdProto appId = 1; +} + +message GetTimelineCollectorContextResponseProto { + optional string user_id = 1; + optional string flow_id = 2; + optional string flow_run_id = 3; +} message NMContainerStatusProto { optional ContainerIdProto container_id = 1; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java index cfc3dc6..3c9f57b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java @@ -60,6 +60,8 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; @@ -166,6 +168,31 @@ public void testRPCOnCollectorNodeManagerProtocol() throws IOException { Assert.assertTrue(e.getMessage().contains(ILLEGAL_NUMBER_MESSAGE)); } + // Verify request with a valid app ID + try { + GetTimelineCollectorContextRequest request = + GetTimelineCollectorContextRequest.newInstance( + ApplicationId.newInstance(0, 1)); + GetTimelineCollectorContextResponse response = + proxy.getTimelineCollectorContext(request); + Assert.assertEquals("test_user_id", response.getUserId()); + Assert.assertEquals("test_flow_id", response.getFlowId()); + Assert.assertEquals("test_flow_run_id", response.getFlowRunId()); + } catch (YarnException | IOException e) { + Assert.fail("RPC call failured is not expected here."); + } + + // Verify request with an invalid app ID + try { + GetTimelineCollectorContextRequest request = + GetTimelineCollectorContextRequest.newInstance( + ApplicationId.newInstance(0, 2)); + proxy.getTimelineCollectorContext(request); + Assert.fail("RPC call failured is expected here."); + } catch (YarnException | IOException e) { + Assert.assertTrue(e instanceof YarnException); + Assert.assertTrue(e.getMessage().contains("The application is not found.")); + } server.stop(); } @@ -340,6 +367,18 @@ public ReportNewCollectorInfoResponse reportNewCollectorInfo( recordFactory.newRecordInstance(ReportNewCollectorInfoResponse.class); return response; } + + @Override + public GetTimelineCollectorContextResponse getTimelineCollectorContext( + GetTimelineCollectorContextRequest request) + throws YarnException, IOException { + if (request.getApplicationId().getId() == 1) { + return GetTimelineCollectorContextResponse.newInstance( + "test_user_id", "test_flow_id", "test_flow_run_id"); + } else { + throw new YarnException("The application is not found."); + } + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java index 009fa63..6ccea84 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java @@ -30,13 +30,17 @@ import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; public class NMCollectorService extends CompositeService implements CollectorNodemanagerProtocol { @@ -93,7 +97,7 @@ public void serviceStop() throws Exception { @Override public ReportNewCollectorInfoResponse reportNewCollectorInfo( - ReportNewCollectorInfoRequest request) throws IOException { + ReportNewCollectorInfoRequest request) throws YarnException, IOException { List newCollectorsList = request.getAppCollectorsList(); if (newCollectorsList != null && !newCollectorsList.isEmpty()) { Map newCollectorsMap = @@ -107,4 +111,16 @@ public ReportNewCollectorInfoResponse reportNewCollectorInfo( return ReportNewCollectorInfoResponse.newInstance(); } + @Override + public GetTimelineCollectorContextResponse getTimelineCollectorContext( + GetTimelineCollectorContextRequest request) + throws YarnException, IOException { + Application app = context.getApplications().get(request.getApplicationId()); + if (app == null) { + throw new YarnException("Application " + request.getApplicationId() + + " doesn't exist on NM."); + } + return GetTimelineCollectorContextResponse.newInstance( + app.getUser(), app.getFlowId(), app.getFlowRunId()); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index acac600..6ac15a6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -139,6 +139,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; public class ContainerManagerImpl extends CompositeService implements ServiceStateChangeListener, ContainerManagementProtocol, @@ -293,8 +294,9 @@ private void recoverApplication(ContainerManagerApplicationProto p) } LOG.info("Recovering application " + appId); - ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId, - creds, context); + //TODO: Recover flow and flow run ID + ApplicationImpl app = new ApplicationImpl( + dispatcher, p.getUser(), null, null, appId, creds, context); context.getApplications().put(appId, app); app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext)); } @@ -849,8 +851,12 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier, try { if (!serviceStopped) { // Create the application - Application application = - new ApplicationImpl(dispatcher, user, applicationID, credentials, context); + String flowId = launchContext.getEnvironment().get( + TimelineUtils.FLOW_ID_TAG_PREFIX); + String flowRunId = launchContext.getEnvironment().get( + TimelineUtils.FLOW_RUN_ID_TAG_PREFIX); + Application application = new ApplicationImpl( + dispatcher, user, flowId, flowRunId, applicationID, credentials, context); if (null == context.getApplications().putIfAbsent(applicationID, application)) { LOG.info("Creating a new application reference for app " + applicationID); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java index b1571e9..decd17d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java @@ -35,4 +35,8 @@ ApplicationState getApplicationState(); + String getFlowId(); + + String getFlowRunId(); + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index 5f84b4f..ceaafe8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -65,6 +65,8 @@ final Dispatcher dispatcher; final String user; + final String flowId; + final String flowRunId; final ApplicationId appId; final Credentials credentials; Map applicationACLs; @@ -80,10 +82,13 @@ Map containers = new HashMap(); - public ApplicationImpl(Dispatcher dispatcher, String user, ApplicationId appId, - Credentials credentials, Context context) { + public ApplicationImpl(Dispatcher dispatcher, String user, String flowId, + String flowRunId, ApplicationId appId, Credentials credentials, + Context context) { this.dispatcher = dispatcher; this.user = user; + this.flowId = flowId; + this.flowRunId = flowRunId; this.appId = appId; this.credentials = credentials; this.aclsManager = context.getApplicationACLsManager(); @@ -488,4 +493,12 @@ public LogAggregationContext getLogAggregationContext() { this.readLock.unlock(); } } + + public String getFlowId() { + return flowId; + } + + public String getFlowRunId() { + return flowRunId; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java index 370a207..5303df5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java @@ -530,7 +530,8 @@ public boolean matches(Object argument) { this.user = user; this.appId = BuilderUtils.newApplicationId(timestamp, id); - app = new ApplicationImpl(dispatcher, this.user, appId, null, context); + app = new ApplicationImpl( + dispatcher, this.user, null, null, appId, null, context); containers = new ArrayList(); for (int i = 0; i < numContainers; i++) { Container container = createMockedContainer(this.appId, i); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java index 4e13010..35b95ee 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java @@ -39,6 +39,9 @@ Map containers = new HashMap(); ApplicationState appState; Application app; + String flowId; + String flowRunId; + public MockApp(int uniqId) { this("mockUser", 1234, uniqId); @@ -77,4 +80,11 @@ public ApplicationState getApplicationState() { public void handle(ApplicationEvent event) {} + public String getFlowId() { + return flowId; + } + + public String getFlowRunId() { + return flowRunId; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java index 5a89e74..1c7ea54 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java @@ -327,7 +327,7 @@ public void testContainerLogs() throws IOException { final String filename = "logfile1"; final String logMessage = "log message\n"; nmContext.getApplications().put(appId, new ApplicationImpl(null, "user", - appId, null, nmContext)); + null, null, appId, null, nmContext)); MockContainer container = new MockContainer(appAttemptId, new AsyncDispatcher(), new Configuration(), "user", appId, 1); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index 0dd9ba1..d74cf30 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; /** * The launch of the AM itself. @@ -217,7 +218,19 @@ private void setupTokens( environment.put(ApplicationConstants.MAX_APP_ATTEMPTS_ENV, String.valueOf(rmContext.getRMApps().get( applicationId).getMaxAppAttempts())); - + for (String tag : + rmContext.getRMApps().get(applicationId).getApplicationTags()) { + if (tag.startsWith(TimelineUtils.FLOW_ID_TAG_PREFIX + ":") || + tag.startsWith(TimelineUtils.FLOW_ID_TAG_PREFIX.toLowerCase() + ":")) { + environment.put(TimelineUtils.FLOW_ID_TAG_PREFIX, + tag.substring(TimelineUtils.FLOW_ID_TAG_PREFIX.length() + 1)); + } + if (tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX + ":") || + tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.toLowerCase() + ":")) { + environment.put(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX, + tag.substring(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.length() + 1)); + } + } Credentials credentials = new Credentials(); DataInputByteBuffer dibb = new DataInputByteBuffer(); if (container.getTokens() != null) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java index 7d59876..f7cd10e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java @@ -18,9 +18,14 @@ package org.apache.hadoop.yarn.server.timelineservice.collector; +import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; /** * Service that handles writes to the timeline service and writes them to the @@ -31,16 +36,24 @@ @Private @Unstable public class AppLevelTimelineCollector extends TimelineCollector { - private final String applicationId; - // TODO define key metadata such as flow metadata, user, and queue + private final ApplicationId appId; + private final TimelineCollectorContext context; - public AppLevelTimelineCollector(String applicationId) { - super(AppLevelTimelineCollector.class.getName() + " - " + applicationId); - this.applicationId = applicationId; + public AppLevelTimelineCollector(ApplicationId appId) { + super(AppLevelTimelineCollector.class.getName() + " - " + appId.toString()); + Preconditions.checkNotNull(appId, "AppId shouldn't be null"); + this.appId = appId; + context = new TimelineCollectorContext(); } @Override protected void serviceInit(Configuration conf) throws Exception { + context.setClusterId(conf.get(YarnConfiguration.RM_CLUSTER_ID, + TimelineUtils.generateDefaultClusterIdBasedOnAppId(appId))); + context.setUserId(UserGroupInformation.getCurrentUser().getShortUserName()); + context.setFlowId(TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId)); + context.setFlowRunId("0"); + context.setAppId(appId.toString()); super.serviceInit(conf); } @@ -54,4 +67,9 @@ protected void serviceStop() throws Exception { super.serviceStop(); } + @Override + protected TimelineCollectorContext getTimelineEntityContext() { + return context; + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java index 59ecef1..2017d01 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java @@ -95,7 +95,7 @@ protected void serviceStop() throws Exception { */ public boolean addApplication(ApplicationId appId) { AppLevelTimelineCollector collector = - new AppLevelTimelineCollector(appId.toString()); + new AppLevelTimelineCollector(appId); return (collectorManager.putIfAbsent(appId, collector) == collector); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index 6e20e69..677feb1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; + /** * Service that handles writes to the timeline service and writes them to the * backing storage. @@ -83,21 +84,24 @@ public TimelineWriter getWriter() { * * This method should be reserved for selected critical entities and events. * For normal voluminous writes one should use the async method - * {@link #postEntitiesAsync(TimelineEntities, UserGroupInformation)}. + * {@link #putEntitiesAsync(TimelineEntities, UserGroupInformation)}. * * @param entities entities to post * @param callerUgi the caller UGI * @return the response that contains the result of the post. */ - public TimelineWriteResponse postEntities(TimelineEntities entities, + public TimelineWriteResponse putEntities(TimelineEntities entities, UserGroupInformation callerUgi) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("SUCCESS - TIMELINE V2 PROTOTYPE"); - LOG.debug("postEntities(entities=" + entities + ", callerUgi=" + LOG.debug("putEntities(entities=" + entities + ", callerUgi=" + callerUgi + ")"); } - return writer.write(entities); + TimelineCollectorContext context = getTimelineEntityContext(); + return writer.write(context.getClusterId(), context.getUserId(), + context.getFlowId(), context.getFlowRunId(), context.getAppId(), + entities); } /** @@ -111,12 +115,15 @@ public TimelineWriteResponse postEntities(TimelineEntities entities, * @param entities entities to post * @param callerUgi the caller UGI */ - public void postEntitiesAsync(TimelineEntities entities, + public void putEntitiesAsync(TimelineEntities entities, UserGroupInformation callerUgi) { // TODO implement if (LOG.isDebugEnabled()) { - LOG.debug("postEntitiesAsync(entities=" + entities + ", callerUgi=" + + LOG.debug("putEntitiesAsync(entities=" + entities + ", callerUgi=" + callerUgi + ")"); } } + + protected abstract TimelineCollectorContext getTimelineEntityContext(); + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java new file mode 100644 index 0000000..c1a10a6 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.collector; + +public class TimelineCollectorContext { + + private String clusterId; + private String userId; + private String flowId; + private String flowRunId; + private String appId; + + public TimelineCollectorContext() { + this(null, null, null, null, null); + } + + public TimelineCollectorContext(String clusterId, String userId, + String flowId, String flowRunId, String appId) { + this.clusterId = clusterId; + this.userId = userId; + this.flowId = flowId; + this.flowRunId = flowRunId; + this.appId = appId; + } + + public String getClusterId() { + return clusterId; + } + + public void setClusterId(String clusterId) { + this.clusterId = clusterId; + } + + public String getUserId() { + return userId; + } + + public void setUserId(String userId) { + this.userId = userId; + } + + public String getFlowId() { + return flowId; + } + + public void setFlowId(String flowId) { + this.flowId = flowId; + } + + public String getFlowRunId() { + return flowRunId; + } + + public void setFlowRunId(String flowRunId) { + this.flowRunId = flowRunId; + } + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java index 3a4515e..909027e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java @@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; @@ -102,6 +104,7 @@ public void serviceInit(Configuration conf) throws Exception { @Override protected void serviceStart() throws Exception { + nmCollectorService = getNMCollectorService(); startWebApp(); super.serviceStart(); } @@ -151,11 +154,11 @@ public TimelineCollector putIfAbsent(ApplicationId appId, // Report to NM if a new collector is added. if (collectorIsNew) { try { + updateTimelineCollectorContext(appId, collector); reportNewCollectorToNM(appId); } catch (Exception e) { - // throw exception here as it cannot be used if failed report to NM - LOG.error("Failed to report a new collector for application: " + appId + - " to the NM Collector Service."); + // throw exception here as it cannot be used if failed communicate with NM + LOG.error("Failed to communicate with NM Collector Service for " + appId); throw new YarnRuntimeException(e); } } @@ -250,7 +253,6 @@ private void startWebApp() { private void reportNewCollectorToNM(ApplicationId appId) throws YarnException, IOException { - this.nmCollectorService = getNMCollectorService(); ReportNewCollectorInfoRequest request = ReportNewCollectorInfoRequest.newInstance(appId, this.timelineRestServerBindAddress); @@ -259,6 +261,28 @@ private void reportNewCollectorToNM(ApplicationId appId) nmCollectorService.reportNewCollectorInfo(request); } + private void updateTimelineCollectorContext( + ApplicationId appId, TimelineCollector collector) + throws YarnException, IOException { + GetTimelineCollectorContextRequest request = + GetTimelineCollectorContextRequest.newInstance(appId); + LOG.info("Get timeline collector context for " + appId); + GetTimelineCollectorContextResponse response = + nmCollectorService.getTimelineCollectorContext(request); + String userId = response.getUserId(); + if (userId != null && !userId.isEmpty()) { + collector.getTimelineEntityContext().setUserId(userId); + } + String flowId = response.getFlowId(); + if (flowId != null && !flowId.isEmpty()) { + collector.getTimelineEntityContext().setFlowId(flowId); + } + String flowRunId = response.getFlowRunId(); + if (flowRunId != null && !flowRunId.isEmpty()) { + collector.getTimelineEntityContext().setFlowRunId(flowRunId); + } + } + @VisibleForTesting protected CollectorNodemanagerProtocol getNMCollectorService() { Configuration conf = getConfig(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java index 5adae71..0f51656 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java @@ -138,7 +138,7 @@ public Response putEntities( LOG.error("Application not found"); throw new NotFoundException(); // different exception? } - collector.postEntities(entities, callerUgi); + collector.putEntities(entities, callerUgi); return Response.ok().build(); } catch (Exception e) { LOG.error("Error putting entities", e); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java index f5603f6..41b6ac9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java @@ -52,7 +52,9 @@ /** default value for storage location on local disk */ public static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT - = "/tmp/timeline_service_data/"; + = "/tmp/timeline_service_data"; + + private static final String ENTITIES_DIR = "entities"; /** Default extension for output files */ public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist"; @@ -61,38 +63,25 @@ super((FileSystemTimelineWriterImpl.class.getName())); } - /** - * Stores the entire information in {@link TimelineEntity} to the - * timeline store. Any errors occurring for individual write request objects - * will be reported in the response. - * - * @param data - * a {@link TimelineEntity} object - * @return {@link TimelineWriteResponse} object. - * @throws IOException - */ @Override - public TimelineWriteResponse write(TimelineEntities entities) - throws IOException { + public TimelineWriteResponse write(String clusterId, String userId, + String flowId, String flowRunId, String appId, + TimelineEntities entities) throws IOException { TimelineWriteResponse response = new TimelineWriteResponse(); for (TimelineEntity entity : entities.getEntities()) { - write(entity, response); + write(clusterId, userId, flowId, flowRunId, appId, entity, response); } return response; } - private void write(TimelineEntity entity, + private void write(String clusterId, String userId, + String flowId, String flowRunId, String appId, TimelineEntity entity, TimelineWriteResponse response) throws IOException { PrintWriter out = null; try { - File outputDir = new File(outputRoot + entity.getType()); - String fileName = outputDir + "/" + entity.getId() - + TIMELINE_SERVICE_STORAGE_EXTENSION; - if (!outputDir.exists()) { - if (!outputDir.mkdirs()) { - throw new IOException("Could not create directories for " + fileName); - } - } + String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId,flowId, + flowRunId, appId, entity.getType()); + String fileName = dir + entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION; out = new PrintWriter(new BufferedWriter(new FileWriter(fileName, true))); out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity)); out.write("\n"); @@ -112,20 +101,7 @@ private void write(TimelineEntity entity, } } - /** - * Aggregates the entity information to the timeline store based on which - * track this entity is to be rolled up to The tracks along which aggregations - * are to be done are given by {@link TimelineAggregationTrack} - * - * Any errors occurring for individual write request objects will be reported - * in the response. - * - * @param data - * a {@link TimelineEntity} object - * a {@link TimelineAggregationTrack} enum value - * @return a {@link TimelineWriteResponse} object. - * @throws IOException - */ + @Override public TimelineWriteResponse aggregate(TimelineEntity data, TimelineAggregationTrack track) throws IOException { return null; @@ -141,4 +117,23 @@ public void serviceInit(Configuration conf) throws Exception { outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT, DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT); } + + @Override + public void serviceStart() throws Exception { + mkdirs(outputRoot, ENTITIES_DIR); + } + + private static String mkdirs(String... dirStrs) throws IOException { + StringBuilder path = new StringBuilder(); + for (String dirStr : dirStrs) { + path.append(dirStr).append('/'); + File dir = new File(path.toString()); + if (!dir.exists()) { + if (!dir.mkdirs()) { + throw new IOException("Could not create directories for " + dir); + } + } + } + return path.toString(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java index 71ad7ab..492e3a9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java @@ -39,12 +39,19 @@ * timeline store. Any errors occurring for individual write request objects * will be reported in the response. * + * @param clusterId context cluster ID + * @param userId context user ID + * @param flowId context flow ID + * @param flowRunId context flow run ID + * @param appId context app ID * @param data * a {@link TimelineEntities} object. * @return a {@link TimelineWriteResponse} object. * @throws IOException */ - TimelineWriteResponse write(TimelineEntities data) throws IOException; + TimelineWriteResponse write(String clusterId, String userId, + String flowId, String flowRunId, String appId, + TimelineEntities data) throws IOException; /** * Aggregates the entity information to the timeline store based on which diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java index 541665b..b4f9221 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java @@ -49,7 +49,7 @@ public void testMultithreadedAdd() throws Exception { Callable task = new Callable() { public Boolean call() { AppLevelTimelineCollector collector = - new AppLevelTimelineCollector(appId.toString()); + new AppLevelTimelineCollector(appId); return (collectorManager.putIfAbsent(appId, collector) == collector); } }; @@ -82,7 +82,7 @@ public void testMultithreadedAddAndRemove() throws Exception { Callable task = new Callable() { public Boolean call() { AppLevelTimelineCollector collector = - new AppLevelTimelineCollector(appId.toString()); + new AppLevelTimelineCollector(appId); boolean successPut = (collectorManager.putIfAbsent(appId, collector) == collector); return successPut && collectorManager.remove(appId.toString()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java index 7f919f0..407b5f6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java @@ -28,9 +28,9 @@ import java.util.List; import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.junit.Test; @@ -52,13 +52,16 @@ public void testWriteEntityToFile() throws Exception { entity.setModifiedTime(1425016502000L); te.addEntity(entity); - try (FileSystemTimelineWriterImpl fsi = - new FileSystemTimelineWriterImpl()) { - fsi.serviceInit(new Configuration()); - fsi.write(te); + FileSystemTimelineWriterImpl fsi = null; + try { + fsi = new FileSystemTimelineWriterImpl(); + fsi.init(new YarnConfiguration()); + fsi.start(); + fsi.write("cluster_id", "user_id", "flow_id", "flow_run_id", "app_id", te); - String fileName = fsi.getOutputRoot() + "/" + type + "/" + id - + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + String fileName = fsi.getOutputRoot() + + "/entities/cluster_id/user_id/flow_id/flow_run_id/app_id/" + type + + "/" + id + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; Path path = Paths.get(fileName); File f = new File(fileName); assertTrue(f.exists() && !f.isDirectory()); @@ -73,6 +76,11 @@ public void testWriteEntityToFile() throws Exception { File outputDir = new File(fsi.getOutputRoot()); FileUtils.deleteDirectory(outputDir); assertTrue(!(f.exists())); + } finally { + if (fsi != null) { + fsi.stop(); + FileUtils.deleteDirectory(new File(fsi.getOutputRoot())); + } } } }