diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index df9f34b..e6ded00 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -24,6 +24,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.StringReader; +import java.lang.reflect.UndeclaredThrowableException; import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; @@ -86,6 +87,7 @@ import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; @@ -246,6 +248,9 @@ // File length needed for local resource private long shellScriptPathLen = 0; + // Timeline domain ID + private String domainId = null; + // Hardcoded path to shell script in launch container's local env private static final String ExecShellStringPath = Client.SCRIPT_PATH + ".sh"; private static final String ExecBatScripStringtPath = Client.SCRIPT_PATH @@ -465,7 +470,9 @@ public boolean init(String[] args) throws ParseException, IOException { shellScriptPathLen = Long.valueOf(envs .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)); } - + if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) { + domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN); + } if (!scriptPath.isEmpty() && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) { LOG.error("Illegal values in env for shell script path" + ", path=" @@ -515,13 +522,6 @@ private void printUsage(Options opts) { @SuppressWarnings({ "unchecked" }) public void run() throws YarnException, IOException { LOG.info("Starting ApplicationMaster"); - try { - publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), - DSEvent.DS_APP_ATTEMPT_START); - } catch (Exception e) { - LOG.error("App Attempt start event could not be published for " - + appAttemptID.toString(), e); - } // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class // are marked as LimitedPrivate @@ -548,6 +548,9 @@ public void run() throws YarnException, IOException { UserGroupInformation.createRemoteUser(appSubmitterUserName); appSubmitterUgi.addCredentials(credentials); + publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), + DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); + AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); amRMClient.init(conf); @@ -612,13 +615,9 @@ public void run() throws YarnException, IOException { amRMClient.addContainerRequest(containerAsk); } numRequestedContainers.set(numTotalContainers); - try { - publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), - DSEvent.DS_APP_ATTEMPT_END); - } catch (Exception e) { - LOG.error("App Attempt start event could not be published for " - + appAttemptID.toString(), e); - } + + publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), + DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); } @VisibleForTesting @@ -724,12 +723,8 @@ public void onContainersCompleted(List completedContainers) { LOG.info("Container completed successfully." + ", containerId=" + containerStatus.getContainerId()); } - try { - publishContainerEndEvent(timelineClient, containerStatus); - } catch (Exception e) { - LOG.error("Container start event could not be published for " - + containerStatus.getContainerId().toString(), e); - } + publishContainerEndEvent( + timelineClient, containerStatus, domainId, appSubmitterUgi); } // ask for more containers if any failed @@ -844,13 +839,9 @@ public void onContainerStarted(ContainerId containerId, if (container != null) { applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId()); } - try { - ApplicationMaster.publishContainerStartEvent( - applicationMaster.timelineClient, container); - } catch (Exception e) { - LOG.error("Container start event could not be published for " - + container.getId().toString(), e); - } + ApplicationMaster.publishContainerStartEvent( + applicationMaster.timelineClient, container, + applicationMaster.domainId, applicationMaster.appSubmitterUgi); } @Override @@ -1050,13 +1041,14 @@ private String readContent(String filePath) throws IOException { } } - private static void publishContainerStartEvent(TimelineClient timelineClient, - Container container) throws IOException, YarnException { - TimelineEntity entity = new TimelineEntity(); + private static void publishContainerStartEvent( + final TimelineClient timelineClient, Container container, String domainId, + UserGroupInformation ugi) { + final TimelineEntity entity = new TimelineEntity(); entity.setEntityId(container.getId().toString()); entity.setEntityType(DSEntity.DS_CONTAINER.toString()); - entity.addPrimaryFilter("user", - UserGroupInformation.getCurrentUser().getShortUserName()); + entity.setDomainId(domainId); + entity.addPrimaryFilter("user", ugi.getShortUserName()); TimelineEvent event = new TimelineEvent(); event.setTimestamp(System.currentTimeMillis()); event.setEventType(DSEvent.DS_CONTAINER_START.toString()); @@ -1064,16 +1056,28 @@ private static void publishContainerStartEvent(TimelineClient timelineClient, event.addEventInfo("Resources", container.getResource().toString()); entity.addEvent(event); - timelineClient.putEntities(entity); + try { + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public TimelinePutResponse run() throws Exception { + return timelineClient.putEntities(entity); + } + }); + } catch (Exception e) { + LOG.error("Container start event could not be published for " + + container.getId().toString(), + e instanceof UndeclaredThrowableException ? e.getCause() : e); + } } - private static void publishContainerEndEvent(TimelineClient timelineClient, - ContainerStatus container) throws IOException, YarnException { - TimelineEntity entity = new TimelineEntity(); + private static void publishContainerEndEvent( + final TimelineClient timelineClient, ContainerStatus container, + String domainId, UserGroupInformation ugi) { + final TimelineEntity entity = new TimelineEntity(); entity.setEntityId(container.getContainerId().toString()); entity.setEntityType(DSEntity.DS_CONTAINER.toString()); - entity.addPrimaryFilter("user", - UserGroupInformation.getCurrentUser().getShortUserName()); + entity.setDomainId(domainId); + entity.addPrimaryFilter("user", ugi.getShortUserName()); TimelineEvent event = new TimelineEvent(); event.setTimestamp(System.currentTimeMillis()); event.setEventType(DSEvent.DS_CONTAINER_END.toString()); @@ -1081,22 +1085,46 @@ private static void publishContainerEndEvent(TimelineClient timelineClient, event.addEventInfo("Exit Status", container.getExitStatus()); entity.addEvent(event); - timelineClient.putEntities(entity); + try { + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public TimelinePutResponse run() throws Exception { + return timelineClient.putEntities(entity); + } + }); + } catch (Exception e) { + LOG.error("Container end event could not be published for " + + container.getContainerId().toString(), + e instanceof UndeclaredThrowableException ? e.getCause() : e); + } } private static void publishApplicationAttemptEvent( - TimelineClient timelineClient, String appAttemptId, DSEvent appEvent) - throws IOException, YarnException { - TimelineEntity entity = new TimelineEntity(); + final TimelineClient timelineClient, String appAttemptId, + DSEvent appEvent, String domainId, UserGroupInformation ugi) { + final TimelineEntity entity = new TimelineEntity(); entity.setEntityId(appAttemptId); entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString()); - entity.addPrimaryFilter("user", - UserGroupInformation.getCurrentUser().getShortUserName()); + entity.setDomainId(domainId); + entity.addPrimaryFilter("user", ugi.getShortUserName()); TimelineEvent event = new TimelineEvent(); event.setEventType(appEvent.toString()); event.setTimestamp(System.currentTimeMillis()); entity.addEvent(event); - timelineClient.putEntities(entity); + try { + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public TimelinePutResponse run() throws Exception { + return timelineClient.putEntities(entity); + } + }); + } catch (Exception e) { + LOG.error("App Attempt " + + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end") + + " event could not be published for " + + appAttemptId.toString(), + e instanceof UndeclaredThrowableException ? e.getCause() : e); + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index f3ce64c..4f12d5a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -70,11 +70,14 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; /** * Client for Distributed Shell application submission to YARN. @@ -165,7 +168,19 @@ private long attemptFailuresValidityInterval = -1; // Debug flag - boolean debugFlag = false; + boolean debugFlag = false; + + // Timeline domain ID + private String domainId = null; + + // Flag to indicate whether to create the domain of the given ID + private boolean toCreate = false; + + // Timeline domain reader access control + private String viewACLs = null; + + // Timeline domain writer access control + private String modifyACLs = null; // Command line options private Options opts; @@ -256,6 +271,14 @@ public Client(Configuration conf) throws Exception { "If failure count reaches to maxAppAttempts, " + "the application will be failed."); opts.addOption("debug", false, "Dump out debug information"); + opts.addOption("domain", true, "ID of the timeline domain where the " + + "timeline entities will be put"); + opts.addOption("view_acls", true, "Users and groups that allowed to " + + "view the timeline entities in the given domain"); + opts.addOption("modify_acls", true, "Users and groups that allowed to " + + "modify the timeline entities in the given domain"); + opts.addOption("create", false, "Flag to indicate whether to create the " + + "domain specified with -domain."); opts.addOption("help", false, "Print usage"); } @@ -385,6 +408,18 @@ public boolean init(String[] args) throws ParseException { log4jPropFile = cliParser.getOptionValue("log_properties", ""); + // Get timeline domain options + if (cliParser.hasOption("domain")) { + domainId = cliParser.getOptionValue("domain"); + toCreate = cliParser.hasOption("create"); + if (cliParser.hasOption("view_acls")) { + viewACLs = cliParser.getOptionValue("view_acls"); + } + if (cliParser.hasOption("modify_acls")) { + modifyACLs = cliParser.getOptionValue("modify_acls"); + } + } + return true; } @@ -431,6 +466,10 @@ public boolean run() throws IOException, YarnException { } } + if (domainId != null && domainId.length() > 0 && toCreate) { + prepareTimelineDomain(); + } + // Get a new application id YarnClientApplication app = yarnClient.createApplication(); GetNewApplicationResponse appResponse = app.getNewApplicationResponse(); @@ -535,6 +574,9 @@ public boolean run() throws IOException, YarnException { env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation); env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp)); env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen)); + if (domainId != null && domainId.length() > 0) { + env.put(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN, domainId); + } // Add AppMaster.jar location to classpath // At some point we should not be required to add @@ -773,4 +815,34 @@ private void addToLocalResources(FileSystem fs, String fileSrcPath, scFileStatus.getLen(), scFileStatus.getModificationTime()); localResources.put(fileDstPath, scRsrc); } + + private void prepareTimelineDomain() { + TimelineClient timelineClient = null; + if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { + timelineClient = TimelineClient.createTimelineClient(); + timelineClient.init(conf); + timelineClient.start(); + } else { + LOG.warn("Cannot put the domain " + domainId + + " because the timeline service is not enabled"); + return; + } + try { + //TODO: we need to check and combine the existing timeline domain ACLs + TimelineDomain domain = new TimelineDomain(); + domain.setId(domainId); + domain.setReaders( + viewACLs != null && viewACLs.length() > 0 ? viewACLs : " "); + domain.setWriters( + modifyACLs != null && modifyACLs.length() > 0 ? modifyACLs : " "); + timelineClient.putDomain(domain); + LOG.info("Put the timeline domain: " + + TimelineUtils.dumpTimelineRecordtoJSON(domain)); + } catch (Exception e) { + LOG.error("Error when putting the timeline domain", e); + } finally { + timelineClient.stop(); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DSConstants.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DSConstants.java index 5912f14..fbaf2d4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DSConstants.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DSConstants.java @@ -44,4 +44,9 @@ * Used to validate the local resource. */ public static final String DISTRIBUTEDSHELLSCRIPTLEN = "DISTRIBUTEDSHELLSCRIPTLEN"; + + /** + * Environment key name denoting the timeline domain ID. + */ + public static final String DISTRIBUTEDSHELLTIMELINEDOMAIN = "DISTRIBUTEDSHELLTIMELINEDOMAIN"; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 6dff94c..1a62359 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -32,7 +32,6 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -43,6 +42,7 @@ import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -145,7 +146,14 @@ public void testDSShell() throws Exception { "--container_memory", "128", "--container_vcores", - "1" + "1", + "--domain", + "TEST_DOMAIN", + "--view_acls", + "reader_user reader_group", + "--modify_acls", + "writer_user writer_group", + "--create" }; LOG.info("Initializing DS Client"); @@ -198,7 +206,14 @@ public void run() { t.join(); LOG.info("Client run completed. Result=" + result); Assert.assertTrue(result.get()); - + + Thread.sleep(5000); + + TimelineDomain domain = yarnCluster.getApplicationHistoryServer() + .getTimelineStore().getDomain("TEST_DOMAIN"); + Assert.assertNotNull(domain); + Assert.assertEquals("reader_user reader_group", domain.getReaders()); + Assert.assertEquals("writer_user writer_group", domain.getWriters()); TimelineEntities entitiesAttempts = yarnCluster .getApplicationHistoryServer() .getTimelineStore() @@ -210,6 +225,8 @@ public void run() { .size()); Assert.assertEquals(entitiesAttempts.getEntities().get(0).getEntityType() .toString(), ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString()); + Assert.assertEquals(domain.getId(), + entitiesAttempts.getEntities().get(0).getDomainId()); TimelineEntities entities = yarnCluster .getApplicationHistoryServer() .getTimelineStore() @@ -219,6 +236,8 @@ public void run() { Assert.assertEquals(2, entities.getEntities().size()); Assert.assertEquals(entities.getEntities().get(0).getEntityType() .toString(), ApplicationMaster.DSEntity.DS_CONTAINER.toString()); + Assert.assertEquals(domain.getId(), + entities.getEntities().get(0).getDomainId()); } /* diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index 13ce074..fbddd14 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -210,9 +210,6 @@ private ClientResponse doPosting(Object obj, String path) throws IOException, Ya @VisibleForTesting public ClientResponse doPostingObject(Object object, String path) { WebResource webResource = client.resource(resURI); - if (path != null) { - webResource.path(path); - } if (path == null) { return webResource.accept(MediaType.APPLICATION_JSON) .type(MediaType.APPLICATION_JSON)