diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index a9a7091..eda1ae3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -188,7 +188,8 @@ private AMRMClientAsync amRMClient; // In both secure and non-secure modes, this points to the job-submitter. - private UserGroupInformation appSubmitterUgi; + @VisibleForTesting + UserGroupInformation appSubmitterUgi; // Handle to communicate with the Node Manager private NMClientAsync nmClientAsync; @@ -270,7 +271,8 @@ private List launchThreads = new ArrayList(); // Timeline Client - private TimelineClient timelineClient; + @VisibleForTesting + TimelineClient timelineClient; private final String linux_bash_command = "bash"; private final String windows_command = "cmd /c"; @@ -496,18 +498,6 @@ public boolean init(String[] args) throws ParseException, IOException { } requestPriority = Integer.parseInt(cliParser .getOptionValue("priority", "0")); - - if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { - // Creating the Timeline Client - timelineClient = TimelineClient.createTimelineClient(); - timelineClient.init(conf); - timelineClient.start(); - } else { - timelineClient = null; - LOG.warn("Timeline service is not enabled"); - } - return true; } @@ -527,7 +517,7 @@ private void printUsage(Options opts) { * @throws IOException */ @SuppressWarnings({ "unchecked" }) - public void run() throws YarnException, IOException { + public void run() throws YarnException, IOException, InterruptedException { LOG.info("Starting ApplicationMaster"); // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class @@ -554,11 +544,7 @@ public void run() throws YarnException, IOException { appSubmitterUgi = UserGroupInformation.createRemoteUser(appSubmitterUserName); appSubmitterUgi.addCredentials(credentials); - - if(timelineClient != null) { - publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), - DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); - } + AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); @@ -570,6 +556,13 @@ public void run() throws YarnException, IOException { nmClientAsync.init(conf); nmClientAsync.start(); + + + if(timelineClient != null) { + publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), + DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); + } + // Setup local RPC Server to accept status requests directly from clients // TODO need to setup a protocol for client to be able to communicate to // the RPC server @@ -625,6 +618,8 @@ public void run() throws YarnException, IOException { } numRequestedContainers.set(numTotalContainers); + startTimelineClient(conf); + if(timelineClient != null) { publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); @@ -632,6 +627,31 @@ public void run() throws YarnException, IOException { } @VisibleForTesting + void startTimelineClient(final Configuration conf) + throws YarnException, IOException, InterruptedException { + try { + appSubmitterUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { + // Creating the Timeline Client + timelineClient = TimelineClient.createTimelineClient(); + timelineClient.init(conf); + timelineClient.start(); + } else { + timelineClient = null; + LOG.warn("Timeline service is not enabled"); + } + return null; + } + }); + } catch (UndeclaredThrowableException e) { + throw new YarnException(e.getCause()); + } + } + + @VisibleForTesting NMCallbackHandler createNMCallbackHandler() { return new NMCallbackHandler(this); } @@ -1104,18 +1124,11 @@ private static void publishContainerEndEvent( event.addEventInfo("State", container.getState().name()); event.addEventInfo("Exit Status", container.getExitStatus()); entity.addEvent(event); - try { - ugi.doAs(new PrivilegedExceptionAction() { - @Override - public TimelinePutResponse run() throws Exception { - return timelineClient.putEntities(entity); - } - }); - } catch (Exception e) { + timelineClient.putEntities(entity); + } catch (YarnException | IOException e) { LOG.error("Container end event could not be published for " - + container.getContainerId().toString(), - e instanceof UndeclaredThrowableException ? e.getCause() : e); + + container.getContainerId().toString(), e); } } @@ -1131,20 +1144,13 @@ private static void publishApplicationAttemptEvent( event.setEventType(appEvent.toString()); event.setTimestamp(System.currentTimeMillis()); entity.addEvent(event); - try { - ugi.doAs(new PrivilegedExceptionAction() { - @Override - public TimelinePutResponse run() throws Exception { - return timelineClient.putEntities(entity); - } - }); - } catch (Exception e) { + timelineClient.putEntities(entity); + } catch (YarnException | IOException e) { LOG.error("App Attempt " + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end") + " event could not be published for " - + appAttemptId.toString(), - e instanceof UndeclaredThrowableException ? e.getCause() : e); + + appAttemptId.toString(), e); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java new file mode 100644 index 0000000..11e840a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.Assert; +import org.junit.Test; + +public class TestDSAppMaster { + + @Test + public void testTimelineClientInDSAppMaster() throws Exception { + ApplicationMaster appMaster = new ApplicationMaster(); + appMaster.appSubmitterUgi = + UserGroupInformation.createUserForTesting("foo", new String[]{"bar"}); + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + appMaster.startTimelineClient(conf); + Assert.assertEquals(appMaster.appSubmitterUgi, + ((TimelineClientImpl)appMaster.timelineClient).getUgi()); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSFailedAppMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSFailedAppMaster.java index f3ab4b7..26022d4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSFailedAppMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSFailedAppMaster.java @@ -29,7 +29,7 @@ private static final Log LOG = LogFactory.getLog(TestDSFailedAppMaster.class); @Override - public void run() throws YarnException, IOException { + public void run() throws YarnException, IOException, InterruptedException { super.run(); // for the 2nd attempt. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index df6c7a4..0ac72dc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -642,4 +642,9 @@ private static void printUsage() { new HelpFormatter().printHelp("TimelineClient", opts); } + @VisibleForTesting + @Private + public UserGroupInformation getUgi() { + return authUgi; + } }