diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index df9f34b..5a7c9cf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -86,6 +86,7 @@ import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; @@ -246,6 +247,9 @@ // File length needed for local resource private long shellScriptPathLen = 0; + // Timeline domain ID + private String domainId = null; + // Hardcoded path to shell script in launch container's local env private static final String ExecShellStringPath = Client.SCRIPT_PATH + ".sh"; private static final String ExecBatScripStringtPath = Client.SCRIPT_PATH @@ -465,7 +469,9 @@ public boolean init(String[] args) throws ParseException, IOException { shellScriptPathLen = Long.valueOf(envs .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)); } - + if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) { + domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN); + } if (!scriptPath.isEmpty() && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) { LOG.error("Illegal values in env for shell script path" + ", path=" @@ -517,7 +523,7 @@ public void run() throws YarnException, IOException { LOG.info("Starting ApplicationMaster"); try { publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), - DSEvent.DS_APP_ATTEMPT_START); + DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); } catch (Exception e) { LOG.error("App Attempt start event could not be published for " + appAttemptID.toString(), e); @@ -614,7 +620,7 @@ public void run() throws YarnException, IOException { numRequestedContainers.set(numTotalContainers); try { publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), - DSEvent.DS_APP_ATTEMPT_END); + DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); } catch (Exception e) { LOG.error("App Attempt start event could not be published for " + appAttemptID.toString(), e); @@ -725,7 +731,8 @@ public void onContainersCompleted(List completedContainers) { + containerStatus.getContainerId()); } try { - publishContainerEndEvent(timelineClient, containerStatus); + publishContainerEndEvent( + timelineClient, containerStatus, domainId, appSubmitterUgi); } catch (Exception e) { LOG.error("Container start event could not be published for " + containerStatus.getContainerId().toString(), e); @@ -846,7 +853,8 @@ public void onContainerStarted(ContainerId containerId, } try { ApplicationMaster.publishContainerStartEvent( - applicationMaster.timelineClient, container); + applicationMaster.timelineClient, container, + applicationMaster.domainId, applicationMaster.appSubmitterUgi); } catch (Exception e) { LOG.error("Container start event could not be published for " + container.getId().toString(), e); @@ -1050,13 +1058,14 @@ private String readContent(String filePath) throws IOException { } } - private static void publishContainerStartEvent(TimelineClient timelineClient, - Container container) throws IOException, YarnException { - TimelineEntity entity = new TimelineEntity(); + private static void publishContainerStartEvent( + final TimelineClient timelineClient, Container container, String domainId, + UserGroupInformation ugi) throws IOException, InterruptedException { + final TimelineEntity entity = new TimelineEntity(); entity.setEntityId(container.getId().toString()); entity.setEntityType(DSEntity.DS_CONTAINER.toString()); - entity.addPrimaryFilter("user", - UserGroupInformation.getCurrentUser().getShortUserName()); + entity.setDomainId(domainId); + entity.addPrimaryFilter("user", ugi.getShortUserName()); TimelineEvent event = new TimelineEvent(); event.setTimestamp(System.currentTimeMillis()); event.setEventType(DSEvent.DS_CONTAINER_START.toString()); @@ -1064,16 +1073,23 @@ private static void publishContainerStartEvent(TimelineClient timelineClient, event.addEventInfo("Resources", container.getResource().toString()); entity.addEvent(event); - timelineClient.putEntities(entity); + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public TimelinePutResponse run() throws Exception { + return timelineClient.putEntities(entity); + } + }); } - private static void publishContainerEndEvent(TimelineClient timelineClient, - ContainerStatus container) throws IOException, YarnException { - TimelineEntity entity = new TimelineEntity(); + private static void publishContainerEndEvent( + final TimelineClient timelineClient, ContainerStatus container, + String domainId, UserGroupInformation ugi) + throws IOException, InterruptedException { + final TimelineEntity entity = new TimelineEntity(); entity.setEntityId(container.getContainerId().toString()); entity.setEntityType(DSEntity.DS_CONTAINER.toString()); - entity.addPrimaryFilter("user", - UserGroupInformation.getCurrentUser().getShortUserName()); + entity.setDomainId(domainId); + entity.addPrimaryFilter("user", ugi.getShortUserName()); TimelineEvent event = new TimelineEvent(); event.setTimestamp(System.currentTimeMillis()); event.setEventType(DSEvent.DS_CONTAINER_END.toString()); @@ -1081,22 +1097,33 @@ private static void publishContainerEndEvent(TimelineClient timelineClient, event.addEventInfo("Exit Status", container.getExitStatus()); entity.addEvent(event); - timelineClient.putEntities(entity); + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public TimelinePutResponse run() throws Exception { + return timelineClient.putEntities(entity); + } + }); } private static void publishApplicationAttemptEvent( - TimelineClient timelineClient, String appAttemptId, DSEvent appEvent) - throws IOException, YarnException { - TimelineEntity entity = new TimelineEntity(); + final TimelineClient timelineClient, String appAttemptId, + DSEvent appEvent, String domainId, UserGroupInformation ugi) + throws IOException, InterruptedException { + final TimelineEntity entity = new TimelineEntity(); entity.setEntityId(appAttemptId); entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString()); - entity.addPrimaryFilter("user", - UserGroupInformation.getCurrentUser().getShortUserName()); + entity.setDomainId(domainId); + entity.addPrimaryFilter("user", ugi.getShortUserName()); TimelineEvent event = new TimelineEvent(); event.setEventType(appEvent.toString()); event.setTimestamp(System.currentTimeMillis()); entity.addEvent(event); - timelineClient.putEntities(entity); + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public TimelinePutResponse run() throws Exception { + return timelineClient.putEntities(entity); + } + }); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index f3ce64c..ef4b0bb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -70,11 +70,14 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; /** * Client for Distributed Shell application submission to YARN. @@ -116,6 +119,7 @@ // Configuration private Configuration conf; private YarnClient yarnClient; + private TimelineClient timelineClient; // Application master specific info to register a new Application with RM/ASM private String appName = ""; // App master priority @@ -165,7 +169,19 @@ private long attemptFailuresValidityInterval = -1; // Debug flag - boolean debugFlag = false; + boolean debugFlag = false; + + // Timeline domain ID + private String domainId = null; + + // Flag to indicate whether to create the domain of the given ID + private boolean toCreate = false; + + // Timeline domain reader access control + private String viewACLs = null; + + // Timeline domain writer access control + private String modifyACLs = null; // Command line options private Options opts; @@ -222,6 +238,11 @@ public Client(Configuration conf) throws Exception { this.appMasterMainClass = appMasterMainClass; yarnClient = YarnClient.createYarnClient(); yarnClient.init(conf); + if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { + timelineClient = TimelineClient.createTimelineClient(); + timelineClient.init(conf); + } opts = new Options(); opts.addOption("appname", true, "Application Name. Default value - DistributedShell"); opts.addOption("priority", true, "Application Priority. Default 0"); @@ -256,6 +277,14 @@ public Client(Configuration conf) throws Exception { "If failure count reaches to maxAppAttempts, " + "the application will be failed."); opts.addOption("debug", false, "Dump out debug information"); + opts.addOption("domain", true, "ID of the timeline domain where the " + + "timeline entities will be put"); + opts.addOption("view_acls", true, "Users and groups that allowed to " + + "view the timeline entities in the given domain"); + opts.addOption("modify_acls", true, "Users and groups that allowed to " + + "modify the timeline entities in the given domain"); + opts.addOption("create", false, "Flag to indicate whether to create the " + + "domain specified with -domain."); opts.addOption("help", false, "Print usage"); } @@ -385,6 +414,18 @@ public boolean init(String[] args) throws ParseException { log4jPropFile = cliParser.getOptionValue("log_properties", ""); + // Get timeline domain options + if (cliParser.hasOption("domain")) { + domainId = cliParser.getOptionValue("domain"); + toCreate = cliParser.hasOption("create"); + if (cliParser.hasOption("view_acls")) { + viewACLs = cliParser.getOptionValue("view_acls"); + } + if (cliParser.hasOption("modify_acls")) { + modifyACLs = cliParser.getOptionValue("modify_acls"); + } + } + return true; } @@ -398,6 +439,9 @@ public boolean run() throws IOException, YarnException { LOG.info("Running Client"); yarnClient.start(); + if (timelineClient != null) { + timelineClient.start(); + } YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics(); LOG.info("Got Cluster metric info from ASM" @@ -431,6 +475,10 @@ public boolean run() throws IOException, YarnException { } } + if (toCreate) { + prepareTimelineDomain(); + } + // Get a new application id YarnClientApplication app = yarnClient.createApplication(); GetNewApplicationResponse appResponse = app.getNewApplicationResponse(); @@ -535,6 +583,9 @@ public boolean run() throws IOException, YarnException { env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation); env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp)); env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen)); + if (domainId != null && domainId.length() > 0) { + env.put(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN, domainId); + } // Add AppMaster.jar location to classpath // At some point we should not be required to add @@ -773,4 +824,27 @@ private void addToLocalResources(FileSystem fs, String fileSrcPath, scFileStatus.getLen(), scFileStatus.getModificationTime()); localResources.put(fileDstPath, scRsrc); } + + private void prepareTimelineDomain() { + if (timelineClient == null) { + return; + } + if (domainId == null) { + return; + } + try { + //TODO: we need to check and combine the existing timeline domain ACLs + TimelineDomain domain = new TimelineDomain(); + domain.setId(domainId); + domain.setReaders( + viewACLs != null && viewACLs.length() > 0 ? viewACLs : " "); + domain.setWriters( + modifyACLs != null && modifyACLs.length() > 0 ? modifyACLs : " "); + timelineClient.putDomain(domain); + LOG.info("Put the timeline domain: " + + TimelineUtils.dumpTimelineRecordtoJSON(domain)); + } catch (Exception e) { + LOG.error("Error when putting the timeline domain", e); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DSConstants.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DSConstants.java index 5912f14..fbaf2d4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DSConstants.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DSConstants.java @@ -44,4 +44,9 @@ * Used to validate the local resource. */ public static final String DISTRIBUTEDSHELLSCRIPTLEN = "DISTRIBUTEDSHELLSCRIPTLEN"; + + /** + * Environment key name denoting the timeline domain ID. + */ + public static final String DISTRIBUTEDSHELLTIMELINEDOMAIN = "DISTRIBUTEDSHELLTIMELINEDOMAIN"; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 6dff94c..cba0f0a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -32,7 +32,6 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -43,6 +42,7 @@ import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -145,7 +146,13 @@ public void testDSShell() throws Exception { "--container_memory", "128", "--container_vcores", - "1" + "1", + "--domain", + "TEST_DOMAIN", + "--view_acls", + "reader_user reader_group", + "--modify_acls", + "writer_user writer_group" }; LOG.info("Initializing DS Client"); @@ -198,7 +205,12 @@ public void run() { t.join(); LOG.info("Client run completed. Result=" + result); Assert.assertTrue(result.get()); - + + TimelineDomain domain = yarnCluster.getApplicationHistoryServer() + .getTimelineStore().getDomain("TEST_DOMAIN"); + Assert.assertNotNull(domain); + Assert.assertEquals("reader_user reader_group", domain.getReaders()); + Assert.assertEquals("writer_user writer_group", domain.getWriters()); TimelineEntities entitiesAttempts = yarnCluster .getApplicationHistoryServer() .getTimelineStore() @@ -210,6 +222,8 @@ public void run() { .size()); Assert.assertEquals(entitiesAttempts.getEntities().get(0).getEntityType() .toString(), ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString()); + Assert.assertEquals(domain.getId(), + entitiesAttempts.getEntities().get(0).getDomainId()); TimelineEntities entities = yarnCluster .getApplicationHistoryServer() .getTimelineStore() @@ -219,6 +233,8 @@ public void run() { Assert.assertEquals(2, entities.getEntities().size()); Assert.assertEquals(entities.getEntities().get(0).getEntityType() .toString(), ApplicationMaster.DSEntity.DS_CONTAINER.toString()); + Assert.assertEquals(domain.getId(), + entities.getEntities().get(0).getDomainId()); } /*