diff --git hadoop-yarn-project/hadoop-yarn/bin/yarn hadoop-yarn-project/hadoop-yarn/bin/yarn index 32d0c8b..38fc05e 100644 --- hadoop-yarn-project/hadoop-yarn/bin/yarn +++ hadoop-yarn-project/hadoop-yarn/bin/yarn @@ -182,9 +182,10 @@ if [ "$COMMAND" = "classpath" ] ; then elif [ "$COMMAND" = "rmadmin" ] ; then CLASS='org.apache.hadoop.yarn.client.cli.RMAdminCLI' YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS" -elif [ "$COMMAND" = "application" ] ; then +elif [ "$COMMAND" = "application" ] || [ "$COMMAND" = "applicationattempt" ] || [ "$COMMAND" = "container" ]; then CLASS=org.apache.hadoop.yarn.client.cli.ApplicationCLI YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS" + set -- $COMMAND $@ elif [ "$COMMAND" = "node" ] ; then CLASS=org.apache.hadoop.yarn.client.cli.NodeCLI YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS" diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AHSClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AHSClient.java new file mode 100644 index 0000000..e3f7453 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AHSClient.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.client.api.impl.AHSClientImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; + +@InterfaceAudience.Public +@InterfaceStability.Stable +public abstract class AHSClient extends AbstractService { + + /** + * Create a new instance of AHSClient. + */ + @Public + public static AHSClient createAHSClient() { + AHSClient client = new AHSClientImpl(); + return client; + } + + @Private + public AHSClient(String name) { + super(name); + } + + /** + *

+ * Get a report of the given Application. + *

+ * + *

+ * In secure mode, YARN verifies access to the application, queue + * etc. before accepting the request. + *

+ * + *

+ * If the user does not have VIEW_APP access then the following + * fields in the report will be set to stubbed values: + *

+ *

+ * + * @param appId + * {@link ApplicationId} of the application that needs a report + * @return application report + * @throws YarnException + * @throws IOException + */ + public abstract ApplicationReport getApplicationReport(ApplicationId appId) + throws YarnException, IOException; + + /** + *

+ * Get a report (ApplicationReport) of all Applications in the cluster. + *

+ * + *

+ * If the user does not have VIEW_APP access for an application + * then the corresponding report will be filtered as described in + * {@link #getApplicationReport(ApplicationId)}. + *

+ * + * @return a list of reports of all running applications + * @throws YarnException + * @throws IOException + */ + public abstract List getApplications() + throws YarnException, IOException; + + /** + *

+ * Get a report of the given ApplicationAttempt. + *

+ * + *

+ * In secure mode, YARN verifies access to the application, queue + * etc. before accepting the request. + *

+ * + *

+ * If the user does not have VIEW_APP access then the following + * fields in the report will be set to stubbed values: + *

+ *

+ * + * @param applicationAttemptId + * {@link ApplicationId} of the application that needs a report + * @return application report + * @throws YarnException + * @throws IOException + */ + public abstract ApplicationAttemptReport getApplicationAttemptReport( + ApplicationAttemptId appAttemptId) throws YarnException, IOException; + + /** + *

+ * Get a report of all (ApplicationAttempts) of Application in the cluster. + *

+ * + *

+ * If the user does not have VIEW_APP access for an application + * then the corresponding report will be filtered as described in + * {@link #getApplicationReport(ApplicationId)}. + *

+ * + * @param applicationId + * @return a list of reports of all running applications + * @throws YarnException + * @throws IOException + */ + public abstract List getApplicationAttempts( + ApplicationId appId) throws YarnException, IOException; + + /** + *

+ * Get a report of the given Container. + *

+ * + *

+ * In secure mode, YARN verifies access to the application, queue + * etc. before accepting the request. + *

+ * + *

+ * If the user does not have VIEW_APP access then the following + * fields in the report will be set to stubbed values: + *

+ *

+ * + * @param containerId + * {@link ApplicationId} of the application that needs a report + * @return application report + * @throws YarnException + * @throws IOException + */ + public abstract ContainerReport getContainerReport(ContainerId containerId) + throws YarnException, IOException; + + /** + *

+ * Get a report of all (Containers) of ApplicationAttempt in the cluster. + *

+ * + *

+ * If the user does not have VIEW_APP access for an application + * then the corresponding report will be filtered as described in + *

+ * + * @param applicationAttemptId + * @return a list of reports of all running applications + * @throws YarnException + * @throws IOException + */ + public abstract List getContainers( + ApplicationAttemptId applicationAttemptId) throws YarnException, + IOException; +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AHSClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AHSClientImpl.java new file mode 100644 index 0000000..b165fd4 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AHSClientImpl.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.client.AHSProxy; +import org.apache.hadoop.yarn.client.api.AHSClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.util.Records; + +@Private +@Unstable +public class AHSClientImpl extends AHSClient { + + private static final Log LOG = LogFactory.getLog(AHSClientImpl.class); + + protected ApplicationHistoryProtocol ahsClient; + protected InetSocketAddress ahsAddress; + protected long statePollIntervalMillis; + + public AHSClientImpl() { + super(AHSClientImpl.class.getName()); + } + + private static InetSocketAddress getAHSAddress(Configuration conf) { + return conf.getSocketAddr(YarnConfiguration.AHS_ADDRESS, + YarnConfiguration.DEFAULT_AHS_ADDRESS, + YarnConfiguration.DEFAULT_AHS_PORT); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.ahsAddress = getAHSAddress(conf); + statePollIntervalMillis = conf.getLong( + YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS, + YarnConfiguration.DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + try { + ahsClient = AHSProxy.createAHSProxy(getConfig(), + ApplicationHistoryProtocol.class, this.ahsAddress); + } catch (IOException e) { + throw new YarnRuntimeException(e); + } + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (this.ahsClient != null) { + RPC.stopProxy(this.ahsClient); + } + super.serviceStop(); + } + + @Override + public ApplicationReport getApplicationReport(ApplicationId appId) + throws YarnException, IOException { + GetApplicationReportRequest request = Records + .newRecord(GetApplicationReportRequest.class); + request.setApplicationId(appId); + GetApplicationReportResponse response = ahsClient + .getApplicationReport(request); + return response.getApplicationReport(); + } + + @Override + public List getApplications() throws YarnException, + IOException { + GetApplicationsRequest request = GetApplicationsRequest.newInstance(null, + null); + GetApplicationsResponse response = ahsClient.getApplications(request); + return response.getApplicationList(); + } + + @Override + public ApplicationAttemptReport getApplicationAttemptReport( + ApplicationAttemptId applicationAttemptId) throws YarnException, + IOException { + GetApplicationAttemptReportRequest request = Records + .newRecord(GetApplicationAttemptReportRequest.class); + request.setApplicationAttemptId(applicationAttemptId); + GetApplicationAttemptReportResponse response = ahsClient + .getApplicationAttemptReport(request); + return response.getApplicationAttemptReport(); + } + + @Override + public List getApplicationAttempts( + ApplicationId appId) throws YarnException, IOException { + GetApplicationAttemptsRequest request = GetApplicationAttemptsRequest + .newInstance(appId); + GetApplicationAttemptsResponse response = ahsClient + .getApplicationAttempts(request); + return response.getApplicationAttemptList(); + } + + @Override + public ContainerReport getContainerReport(ContainerId containerId) + throws YarnException, IOException { + GetContainerReportRequest request = Records + .newRecord(GetContainerReportRequest.class); + request.setContainerId(containerId); + GetContainerReportResponse response = ahsClient.getContainerReport(request); + return response.getContainerReport(); + } + + @Override + public List getContainers( + ApplicationAttemptId applicationAttemptId) throws YarnException, + IOException { + GetContainersRequest request = GetContainersRequest + .newInstance(applicationAttemptId); + GetContainersResponse response = ahsClient.getContainers(request); + return response.getContainerList(); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java index a7b7d65..8fd6d12 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java @@ -35,9 +35,12 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -46,13 +49,19 @@ @Private @Unstable public class ApplicationCLI extends YarnCLI { - private static final String APPLICATIONS_PATTERN = - "%30s\t%20s\t%20s\t%10s\t%10s\t%18s\t%18s\t%15s\t%35s" + - System.getProperty("line.separator"); + private static final String APPLICATIONS_PATTERN = "%30s\t%20s\t%20s\t%10s\t%10s\t%18s\t%18s\t%15s\t%35s" + + System.getProperty("line.separator"); + private static final String APPLICATION_ATTEMPTS_PATTERN = "%30s\t%20s\t%20s\t%35s" + + System.getProperty("line.separator"); + private static final String CONTAINER_PATTERN = "%30s\t%20s\t%20s\t%20s\t%20s\t%35s" + + System.getProperty("line.separator"); private static final String APP_TYPE_CMD = "appTypes"; - private static final String APP_STATE_CMD ="appStates"; + private static final String APP_STATE_CMD = "appStates"; private static final String ALLSTATES_OPTION = "ALL"; + public static final String APPLICATION = "application"; + public static final String APPLICATION_ATTEMPT = "applicationattempt"; + public static final String CONTAINER = "container"; private boolean allAppStates; @@ -69,23 +78,33 @@ public static void main(String[] args) throws Exception { public int run(String[] args) throws Exception { Options opts = new Options(); - opts.addOption(STATUS_CMD, true, "Prints the status of the application."); - opts.addOption(LIST_CMD, false, "List applications from the RM. " + - "Supports optional use of -appTypes to filter applications " + - "based on application type, " + - "and -appStates to filter applications based on application state"); + opts.addOption(STATUS_CMD, true, + "Prints the status of the application/Attempts/Containers."); + if (args[0].compareToIgnoreCase(APPLICATION) == 0) { + opts.addOption(LIST_CMD, false, "List applications from the RM. " + + "Supports optional use of -appTypes to filter applications " + + "based on application type, " + + "and -appStates to filter applications based on application state"); + } else if (args[0].compareToIgnoreCase(APPLICATION_ATTEMPT) == 0 + || args[0].compareToIgnoreCase(CONTAINER) == 0) { + opts.addOption(LIST_CMD, true, + "List application attempts for aplication from AHS. "); + } else if (args[0].compareToIgnoreCase("container") == 0) { + opts.addOption(LIST_CMD, true, + "List containers for application attempts from AHS. "); + } opts.addOption(KILL_CMD, true, "Kills the application."); opts.addOption(HELP_CMD, false, "Displays help for all commands."); - Option appTypeOpt = new Option(APP_TYPE_CMD, true, "Works with -list to " + - "filter applications based on " + - "input comma-separated list of application types."); + Option appTypeOpt = new Option(APP_TYPE_CMD, true, "Works with -list to " + + "filter applications based on " + + "input comma-separated list of application types."); appTypeOpt.setValueSeparator(','); appTypeOpt.setArgs(Option.UNLIMITED_VALUES); appTypeOpt.setArgName("Types"); opts.addOption(appTypeOpt); - Option appStateOpt = new Option(APP_STATE_CMD, true, "Works with -list " + - "to filter applications based on input comma-separated list of " + - "application states. " + getAllValidApplicationStates()); + Option appStateOpt = new Option(APP_STATE_CMD, true, "Works with -list " + + "to filter applications based on input comma-separated list of " + + "application states. " + getAllValidApplicationStates()); appStateOpt.setValueSeparator(','); appStateOpt.setArgs(Option.UNLIMITED_VALUES); appStateOpt.setArgName("States"); @@ -104,50 +123,70 @@ public int run(String[] args) throws Exception { } if (cliParser.hasOption(STATUS_CMD)) { - if (args.length != 2) { + if (args.length != 3) { printUsage(opts); return exitCode; } - printApplicationReport(cliParser.getOptionValue(STATUS_CMD)); + if (args[0].compareToIgnoreCase(APPLICATION) == 0) { + printApplicationReport(cliParser.getOptionValue(STATUS_CMD)); + } else if (args[0].compareToIgnoreCase(APPLICATION_ATTEMPT) == 0) { + printApplicationAttemptReport(cliParser.getOptionValue(STATUS_CMD)); + } else if (args[0].compareToIgnoreCase(CONTAINER) == 0) { + printContainerReport(cliParser.getOptionValue(STATUS_CMD)); + } } else if (cliParser.hasOption(LIST_CMD)) { - allAppStates = false; - Set appTypes = new HashSet(); - if(cliParser.hasOption(APP_TYPE_CMD)) { - String[] types = cliParser.getOptionValues(APP_TYPE_CMD); - if (types != null) { - for (String type : types) { - if (!type.trim().isEmpty()) { - appTypes.add(type.toUpperCase().trim()); + if (args[0].compareToIgnoreCase(APPLICATION) == 0) { + allAppStates = false; + Set appTypes = new HashSet(); + if (cliParser.hasOption(APP_TYPE_CMD)) { + String[] types = cliParser.getOptionValues(APP_TYPE_CMD); + if (types != null) { + for (String type : types) { + if (!type.trim().isEmpty()) { + appTypes.add(type.toUpperCase().trim()); + } } } } - } - EnumSet appStates = - EnumSet.noneOf(YarnApplicationState.class); - if (cliParser.hasOption(APP_STATE_CMD)) { - String[] states = cliParser.getOptionValues(APP_STATE_CMD); - if (states != null) { - for (String state : states) { - if (!state.trim().isEmpty()) { - if (state.trim().equalsIgnoreCase(ALLSTATES_OPTION)) { - allAppStates = true; - break; - } - try { - appStates.add(YarnApplicationState.valueOf(state.toUpperCase() - .trim())); - } catch (IllegalArgumentException ex) { - sysout.println("The application state " + state - + " is invalid."); - sysout.println(getAllValidApplicationStates()); - return exitCode; + EnumSet appStates = EnumSet + .noneOf(YarnApplicationState.class); + if (cliParser.hasOption(APP_STATE_CMD)) { + String[] states = cliParser.getOptionValues(APP_STATE_CMD); + if (states != null) { + for (String state : states) { + if (!state.trim().isEmpty()) { + if (state.trim().equalsIgnoreCase(ALLSTATES_OPTION)) { + allAppStates = true; + break; + } + try { + appStates.add(YarnApplicationState.valueOf(state + .toUpperCase().trim())); + } catch (IllegalArgumentException ex) { + sysout.println("The application state " + state + + " is invalid."); + sysout.println(getAllValidApplicationStates()); + return exitCode; + } } } } } + listApplications(appTypes, appStates); + } else if (args[0].compareToIgnoreCase(APPLICATION_ATTEMPT) == 0) { + if (args.length != 3) { + printUsage(opts); + return exitCode; + } + listApplicationAttempts(cliParser.getOptionValue(LIST_CMD)); + } else if (args[0].compareToIgnoreCase(CONTAINER) == 0) { + if (args.length != 3) { + printUsage(opts); + return exitCode; + } + listContainers(cliParser.getOptionValue(LIST_CMD)); } - listApplications(appTypes, appStates); } else if (cliParser.hasOption(KILL_CMD)) { if (args.length != 2) { printUsage(opts); @@ -175,8 +214,83 @@ void printUsage(Options opts) { } /** - * Lists the applications matching the given application Types - * And application States present in the Resource Manager + * Prints the application report for an application id. + * + * @param applicationId + * @throws YarnException + */ + private void printApplicationAttemptReport(String applicationAttemptId) + throws YarnException, IOException { + ApplicationAttemptReport appAttemptReport = historyClient + .getApplicationAttemptReport(ConverterUtils + .toApplicationAttemptId(applicationAttemptId)); + // Use PrintWriter.println, which uses correct platform line ending. + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintWriter appReportStr = new PrintWriter(baos); + if (appAttemptReport != null) { + appReportStr.println("Application Attempt Report : "); + appReportStr.print("\tApplicationAttempt-Id : "); + appReportStr.println(appAttemptReport.getApplicationAttemptId()); + appReportStr.print("\tState : "); + appReportStr.println(appAttemptReport.getYarnApplicationAttemptState()); + appReportStr.print("\tAMContainer : "); + appReportStr.println(appAttemptReport.getAMContainerId().toString()); + appReportStr.print("\tTracking-URL : "); + appReportStr.println(appAttemptReport.getTrackingUrl()); + appReportStr.print("\tRPC Port : "); + appReportStr.println(appAttemptReport.getRpcPort()); + appReportStr.print("\tAM Host : "); + appReportStr.println(appAttemptReport.getHost()); + appReportStr.print("\tDiagnostics : "); + appReportStr.print(appAttemptReport.getDiagnostics()); + } else { + appReportStr.print("Application Attempt with id '" + applicationAttemptId + + "' doesn't exist in History Server."); + } + appReportStr.close(); + sysout.println(baos.toString("UTF-8")); + } + + /** + * Prints the container report for an application attempt id. + * + * @param applicationAttemptId + * @throws YarnException + */ + private void printContainerReport(String containerId) throws YarnException, + IOException { + ContainerReport containerReport = historyClient + .getContainerReport((ConverterUtils.toContainerId(containerId))); + // Use PrintWriter.println, which uses correct platform line ending. + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintWriter appReportStr = new PrintWriter(baos); + if (containerReport != null) { + appReportStr.println("Container Report : "); + appReportStr.print("\tContainer-Id : "); + appReportStr.println(containerReport.getContainerId()); + appReportStr.print("\tStart-Time : "); + appReportStr.println(containerReport.getStartTime()); + appReportStr.print("\tFinish-Time : "); + appReportStr.println(containerReport.getFinishTime()); + appReportStr.print("\tState : "); + appReportStr.println(containerReport.getContainerState()); + appReportStr.print("\tLOG-URL : "); + appReportStr.println(containerReport.getLogUrl()); + appReportStr.print("\tHost : "); + appReportStr.println(containerReport.getAssignedNode()); + appReportStr.print("\tDiagnostics : "); + appReportStr.print(containerReport.getDiagnosticsInfo()); + } else { + appReportStr.print("Container with id '" + containerId + + "' doesn't exist in Hostory Server."); + } + appReportStr.close(); + sysout.println(baos.toString("UTF-8")); + } + + /** + * Lists the applications matching the given application Types And application + * States present in the Resource Manager * * @param appTypes * @param appStates @@ -188,7 +302,7 @@ private void listApplications(Set appTypes, IOException { PrintWriter writer = new PrintWriter(sysout); if (allAppStates) { - for(YarnApplicationState appState : YarnApplicationState.values()) { + for (YarnApplicationState appState : YarnApplicationState.values()) { appStates.add(appState); } } else { @@ -199,23 +313,24 @@ private void listApplications(Set appTypes, } } - List appsReport = - client.getApplications(appTypes, appStates); + List appsReport = client.getApplications(appTypes, + appStates); - writer - .println("Total number of applications (application-types: " + appTypes - + " and states: " + appStates + ")" + ":" + appsReport.size()); - writer.printf(APPLICATIONS_PATTERN, "Application-Id", - "Application-Name","Application-Type", "User", "Queue", - "State", "Final-State","Progress", "Tracking-URL"); + writer.println("Total number of applications (application-types: " + + appTypes + " and states: " + appStates + ")" + ":" + + appsReport.size()); + writer.printf(APPLICATIONS_PATTERN, "Application-Id", "Application-Name", + "Application-Type", "User", "Queue", "State", "Final-State", + "Progress", "Tracking-URL"); for (ApplicationReport appReport : appsReport) { DecimalFormat formatter = new DecimalFormat("###.##%"); String progress = formatter.format(appReport.getProgress()); writer.printf(APPLICATIONS_PATTERN, appReport.getApplicationId(), - appReport.getName(),appReport.getApplicationType(), appReport.getUser(), - appReport.getQueue(),appReport.getYarnApplicationState(), - appReport.getFinalApplicationStatus(),progress, - appReport.getOriginalTrackingUrl()); + appReport.getName(), appReport.getApplicationType(), appReport + .getUser(), appReport.getQueue(), appReport + .getYarnApplicationState(), + appReport.getFinalApplicationStatus(), progress, appReport + .getOriginalTrackingUrl()); } writer.flush(); } @@ -227,8 +342,8 @@ private void listApplications(Set appTypes, * @throws YarnException * @throws IOException */ - private void killApplication(String applicationId) - throws YarnException, IOException { + private void killApplication(String applicationId) throws YarnException, + IOException { ApplicationId appId = ConverterUtils.toApplicationId(applicationId); ApplicationReport appReport = client.getApplicationReport(appId); if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED @@ -249,8 +364,19 @@ private void killApplication(String applicationId) */ private void printApplicationReport(String applicationId) throws YarnException, IOException { - ApplicationReport appReport = client.getApplicationReport(ConverterUtils - .toApplicationId(applicationId)); + ApplicationReport appReport = null; + try { + appReport = client.getApplicationReport(ConverterUtils + .toApplicationId(applicationId)); + } catch (YarnException e) { + if (!(e.getClass() == ApplicationNotFoundException.class)) { + throw e; + } + } + if (appReport == null) { + appReport = historyClient.getApplicationReport(ConverterUtils + .toApplicationId(applicationId)); + } // Use PrintWriter.println, which uses correct platform line ending. ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintWriter appReportStr = new PrintWriter(baos); @@ -296,14 +422,63 @@ private void printApplicationReport(String applicationId) private String getAllValidApplicationStates() { StringBuilder sb = new StringBuilder(); - sb.append("The valid application state can be" - + " one of the following: "); + sb.append("The valid application state can be" + " one of the following: "); sb.append(ALLSTATES_OPTION + ","); - for (YarnApplicationState appState : YarnApplicationState - .values()) { - sb.append(appState+","); + for (YarnApplicationState appState : YarnApplicationState.values()) { + sb.append(appState + ","); } String output = sb.toString(); - return output.substring(0, output.length()-1); + return output.substring(0, output.length() - 1); + } + + /** + * Lists the application attempts matching the given applicationid + * + * @param applicationId + * @throws YarnException + * @throws IOException + */ + private void listApplicationAttempts(String appId) throws YarnException, + IOException { + PrintWriter writer = new PrintWriter(sysout); + + List appAttemptsReport = historyClient + .getApplicationAttempts(ConverterUtils.toApplicationId(appId)); + writer.println("Total number of application attempts " + ":" + + appAttemptsReport.size()); + writer.printf(APPLICATION_ATTEMPTS_PATTERN, "ApplicationAttempt-Id", + "State", "AM-Container-Id", "Tracking-URL"); + for (ApplicationAttemptReport appAttemptReport : appAttemptsReport) { + writer.printf(APPLICATION_ATTEMPTS_PATTERN, appAttemptReport + .getApplicationAttemptId(), appAttemptReport + .getYarnApplicationAttemptState(), appAttemptReport + .getAMContainerId().toString(), appAttemptReport.getTrackingUrl()); + } + writer.flush(); + } + + /** + * Lists the containers matching the given application attempts + * + * @param appAttemptId + * @throws YarnException + * @throws IOException + */ + private void listContainers(String appAttemptId) throws YarnException, + IOException { + PrintWriter writer = new PrintWriter(sysout); + + List appsReport = historyClient + .getContainers(ConverterUtils.toApplicationAttemptId(appAttemptId)); + writer.println("Total number of containers " + ":" + appsReport.size()); + writer.printf(CONTAINER_PATTERN, "Container-Id", "Start Time", + "Finish Time", "State", "Host", "LOG-URL"); + for (ContainerReport containerReport : appsReport) { + writer.printf(CONTAINER_PATTERN, containerReport.getContainerId(), + containerReport.getStartTime(), containerReport.getFinishTime(), + containerReport.getContainerState(), containerReport + .getAssignedNode(), containerReport.getLogUrl()); + } + writer.flush(); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java index 921c135..c0eeb05 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.util.Tool; +import org.apache.hadoop.yarn.client.api.AHSClient; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -37,12 +38,16 @@ protected PrintStream sysout; protected PrintStream syserr; protected YarnClient client; + protected AHSClient historyClient; public YarnCLI() { super(new YarnConfiguration()); client = YarnClient.createYarnClient(); client.init(getConf()); client.start(); + historyClient = AHSClient.createAHSClient(); + historyClient.init(getConf()); + historyClient.start(); } public void setSysOutPrintStream(PrintStream sysout) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAHSClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAHSClient.java new file mode 100644 index 0000000..fce38e2 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAHSClient.java @@ -0,0 +1,382 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.AHSClient; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.junit.Test; + +public class TestAHSClient { + + @Test + public void testClientStop() { + Configuration conf = new Configuration(); + ResourceManager rm = new ResourceManager(); + rm.init(conf); + rm.start(); + + AHSClient client = AHSClient.createAHSClient(); + client.init(conf); + client.start(); + client.stop(); + rm.stop(); + } + + @Test(timeout = 10000) + public void testGetApplications() throws YarnException, IOException { + Configuration conf = new Configuration(); + final AHSClient client = new MockAHSClient(); + client.init(conf); + client.start(); + + List expectedReports = ((MockAHSClient) client) + .getReports(); + + List reports = client.getApplications(); + Assert.assertEquals(reports, expectedReports); + + reports = client.getApplications(); + Assert.assertEquals(reports.size(), 4); + client.stop(); + } + + @Test(timeout = 10000) + public void testGetApplicationReport() throws YarnException, IOException { + Configuration conf = new Configuration(); + final AHSClient client = new MockAHSClient(); + client.init(conf); + client.start(); + + List expectedReports = ((MockAHSClient) client) + .getReports(); + ApplicationId applicationId = ApplicationId.newInstance(1234, 5); + ApplicationReport report = client.getApplicationReport(applicationId); + Assert.assertEquals(report, expectedReports.get(0)); + Assert.assertEquals(report.getApplicationId().toString(), expectedReports + .get(0).getApplicationId().toString()); + client.stop(); + } + + @Test(timeout = 10000) + public void testGetApplicationAttempts() throws YarnException, IOException { + Configuration conf = new Configuration(); + final AHSClient client = new MockAHSClient(); + client.init(conf); + client.start(); + + List expectedReports = ((MockAHSClient) client) + .getReports(); + + ApplicationId applicationId = ApplicationId.newInstance(1234, 5); + List reports = client + .getApplicationAttempts(applicationId); + Assert.assertNotNull(reports); + Assert.assertEquals(reports.get(0).getApplicationAttemptId().toString(), + expectedReports.get(0).getCurrentApplicationAttemptId().toString()); + client.stop(); + } + + @Test(timeout = 10000) + public void testGetApplicationAttempt() throws YarnException, IOException { + Configuration conf = new Configuration(); + final AHSClient client = new MockAHSClient(); + client.init(conf); + client.start(); + + List expectedReports = ((MockAHSClient) client) + .getReports(); + + ApplicationId applicationId = ApplicationId.newInstance(1234, 5); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( + applicationId, 1); + ApplicationAttemptReport report = client + .getApplicationAttemptReport(appAttemptId); + Assert.assertNotNull(report); + Assert.assertEquals(report.getApplicationAttemptId().toString(), + expectedReports.get(0).getCurrentApplicationAttemptId().toString()); + client.stop(); + } + + @Test(timeout = 10000) + public void testGetContainers() throws YarnException, IOException { + Configuration conf = new Configuration(); + final AHSClient client = new MockAHSClient(); + client.init(conf); + client.start(); + + List expectedReports = ((MockAHSClient) client) + .getReports(); + + ApplicationId applicationId = ApplicationId.newInstance(1234, 5); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( + applicationId, 1); + List reports = client.getContainers(appAttemptId); + Assert.assertNotNull(reports); + Assert.assertEquals(reports.get(0).getContainerId().toString(), + (ContainerId.newInstance(expectedReports.get(0) + .getCurrentApplicationAttemptId(), 1)).toString()); + client.stop(); + } + + @Test(timeout = 10000) + public void testGetContainerReport() throws YarnException, IOException { + Configuration conf = new Configuration(); + final AHSClient client = new MockAHSClient(); + client.init(conf); + client.start(); + + List expectedReports = ((MockAHSClient) client) + .getReports(); + + ApplicationId applicationId = ApplicationId.newInstance(1234, 5); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( + applicationId, 1); + ContainerId containerId = ContainerId.newInstance(appAttemptId, 1); + ContainerReport report = client.getContainerReport(containerId); + Assert.assertNotNull(report); + Assert.assertEquals(report.getContainerId().toString(), + (ContainerId.newInstance(expectedReports.get(0) + .getCurrentApplicationAttemptId(), 1)).toString()); + client.stop(); + } + + private static class MockAHSClient extends AHSClientImpl { + // private ApplicationReport mockReport; + private List reports = new ArrayList(); + private HashMap> attempts = new HashMap>(); + private HashMap> containers = new HashMap>(); + GetApplicationsResponse mockAppResponse = mock(GetApplicationsResponse.class); + GetApplicationReportResponse mockResponse = mock(GetApplicationReportResponse.class); + GetApplicationAttemptsResponse mockAppAttemptsResponse = mock(GetApplicationAttemptsResponse.class); + GetApplicationAttemptReportResponse mockAttemptResponse = mock(GetApplicationAttemptReportResponse.class); + GetContainersResponse mockContainersResponse = mock(GetContainersResponse.class); + GetContainerReportResponse mockContainerResponse = mock(GetContainerReportResponse.class); + + public MockAHSClient() { + super(); + createAppReports(); + } + + @Override + public void start() { + ahsClient = mock(ApplicationHistoryProtocol.class); + + try { + when( + ahsClient + .getApplicationReport(any(GetApplicationReportRequest.class))) + .thenReturn(mockResponse); + when(ahsClient.getApplications(any(GetApplicationsRequest.class))) + .thenReturn(mockAppResponse); + when( + ahsClient + .getApplicationAttemptReport(any(GetApplicationAttemptReportRequest.class))) + .thenReturn(mockAttemptResponse); + when( + ahsClient + .getApplicationAttempts(any(GetApplicationAttemptsRequest.class))) + .thenReturn(mockAppAttemptsResponse); + when(ahsClient.getContainers(any(GetContainersRequest.class))) + .thenReturn(mockContainersResponse); + + when(ahsClient.getContainerReport(any(GetContainerReportRequest.class))) + .thenReturn(mockContainerResponse); + + } catch (YarnException e) { + Assert.fail("Exception is not expected."); + } catch (IOException e) { + Assert.fail("Exception is not expected."); + } + } + + @Override + public List getApplications() throws YarnException, + IOException { + when(mockAppResponse.getApplicationList()).thenReturn(reports); + return super.getApplications(); + } + + @Override + public ApplicationReport getApplicationReport(ApplicationId appId) + throws YarnException, IOException { + when(mockResponse.getApplicationReport()).thenReturn(getReport(appId)); + return super.getApplicationReport(appId); + } + + @Override + public List getApplicationAttempts( + ApplicationId appId) throws YarnException, IOException { + when(mockAppAttemptsResponse.getApplicationAttemptList()).thenReturn( + getAttempts(appId)); + return super.getApplicationAttempts(appId); + } + + @Override + public ApplicationAttemptReport getApplicationAttemptReport( + ApplicationAttemptId appAttemptId) throws YarnException, IOException { + when(mockAttemptResponse.getApplicationAttemptReport()).thenReturn( + getAttempt(appAttemptId)); + return super.getApplicationAttemptReport(appAttemptId); + } + + @Override + public List getContainers(ApplicationAttemptId appAttemptId) + throws YarnException, IOException { + when(mockContainersResponse.getContainerList()).thenReturn( + getContainersReport(appAttemptId)); + return super.getContainers(appAttemptId); + } + + @Override + public ContainerReport getContainerReport(ContainerId containerId) + throws YarnException, IOException { + when(mockContainerResponse.getContainerReport()).thenReturn( + getContainer(containerId)); + return super.getContainerReport(containerId); + } + + @Override + public void stop() { + } + + public ApplicationReport getReport(ApplicationId appId) { + for (int i = 0; i < reports.size(); ++i) { + if (appId.toString().equalsIgnoreCase( + reports.get(i).getApplicationId().toString())) { + return reports.get(i); + } + } + return null; + } + + public List getAttempts(ApplicationId appId) { + return attempts.get(appId); + } + + public ApplicationAttemptReport getAttempt(ApplicationAttemptId appAttemptId) { + return attempts.get(appAttemptId.getApplicationId()).get(0); + } + + public List getContainersReport( + ApplicationAttemptId appAttemptId) { + return containers.get(appAttemptId); + } + + public ContainerReport getContainer(ContainerId containerId) { + return containers.get(containerId.getApplicationAttemptId()).get(0); + } + + public List getReports() { + return this.reports; + } + + private void createAppReports() { + ApplicationId applicationId = ApplicationId.newInstance(1234, 5); + ApplicationReport newApplicationReport = ApplicationReport + .newInstance(applicationId, ApplicationAttemptId.newInstance( + applicationId, 1), "user", "queue", "appname", "host", 124, null, + YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0, + FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", + null); + List applicationReports = new ArrayList(); + applicationReports.add(newApplicationReport); + List appAttempts = new ArrayList(); + ApplicationAttemptReport attempt = ApplicationAttemptReport.newInstance( + newApplicationReport.getCurrentApplicationAttemptId(), "host", 124, + "url", "diagnostics", YarnApplicationAttemptState.FINISHED, + ContainerId.newInstance(newApplicationReport + .getCurrentApplicationAttemptId(), 1)); + appAttempts.add(attempt); + attempts.put(applicationId, appAttempts); + + List containerReports = new ArrayList(); + ContainerReport container = ContainerReport.newInstance(attempt + .getAMContainerId(), null, NodeId.newInstance("host", 1234), + Priority.UNDEFINED, 1234, 5678, "diagnosticInfo", "logURL", 0, null); + containerReports.add(container); + containers.put(newApplicationReport.getCurrentApplicationAttemptId(), + containerReports); + + ApplicationId applicationId2 = ApplicationId.newInstance(1234, 6); + ApplicationReport newApplicationReport2 = ApplicationReport.newInstance( + applicationId2, ApplicationAttemptId.newInstance(applicationId2, 2), + "user2", "queue2", "appname2", "host2", 125, null, + YarnApplicationState.FINISHED, "diagnostics2", "url2", 2, 2, + FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.63789f, "NON-YARN", + null); + applicationReports.add(newApplicationReport2); + + ApplicationId applicationId3 = ApplicationId.newInstance(1234, 7); + ApplicationReport newApplicationReport3 = ApplicationReport.newInstance( + applicationId3, ApplicationAttemptId.newInstance(applicationId3, 3), + "user3", "queue3", "appname3", "host3", 126, null, + YarnApplicationState.RUNNING, "diagnostics3", "url3", 3, 3, + FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.73789f, "MAPREDUCE", + null); + applicationReports.add(newApplicationReport3); + + ApplicationId applicationId4 = ApplicationId.newInstance(1234, 8); + ApplicationReport newApplicationReport4 = ApplicationReport.newInstance( + applicationId4, ApplicationAttemptId.newInstance(applicationId4, 4), + "user4", "queue4", "appname4", "host4", 127, null, + YarnApplicationState.FAILED, "diagnostics4", "url4", 4, 4, + FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.83789f, + "NON-MAPREDUCE", null); + applicationReports.add(newApplicationReport4); + reports = applicationReports; + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AHSProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AHSProxy.java new file mode 100644 index 0000000..daf86d4 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AHSProxy.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.security.PrivilegedAction; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.ipc.YarnRPC; + +import com.google.common.annotations.VisibleForTesting; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +@SuppressWarnings("unchecked") +public class AHSProxy { + + private static final Log LOG = LogFactory.getLog(AHSProxy.class); + + public static T createAHSProxy(final Configuration conf, + final Class protocol, InetSocketAddress ahsAddress) throws IOException { + RetryPolicy retryPolicy = createRetryPolicy(conf); + T proxy = AHSProxy. getProxy(conf, protocol, ahsAddress); + LOG.info("Connecting to Application History server at " + ahsAddress); + return (T) RetryProxy.create(protocol, proxy, retryPolicy); + } + + private static T getProxy(final Configuration conf, + final Class protocol, final InetSocketAddress ahsAddress) + throws IOException { + return UserGroupInformation.getCurrentUser().doAs( + new PrivilegedAction() { + + @Override + public T run() { + return (T) YarnRPC.create(conf) + .getProxy(protocol, ahsAddress, conf); + } + }); + } + + @Private + @VisibleForTesting + public static RetryPolicy createRetryPolicy(Configuration conf) { + long rmConnectWaitMS = conf.getInt( + YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, + YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS); + long rmConnectionRetryIntervalMS = conf.getLong( + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS); + + if (rmConnectionRetryIntervalMS < 0) { + throw new YarnRuntimeException("Invalid Configuration. " + + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS + + " should not be negative."); + } + + boolean waitForEver = (rmConnectWaitMS == -1); + + if (waitForEver) { + return RetryPolicies.RETRY_FOREVER; + } else { + if (rmConnectWaitMS < 0) { + throw new YarnRuntimeException("Invalid Configuration. " + + YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS + + " can be -1, but can not be other negative numbers"); + } + + // try connect once + if (rmConnectWaitMS < rmConnectionRetryIntervalMS) { + LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS + + " is smaller than " + + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS + + ". Only try connect once."); + rmConnectWaitMS = 0; + } + } + + RetryPolicy retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep( + rmConnectWaitMS, rmConnectionRetryIntervalMS, TimeUnit.MILLISECONDS); + + Map, RetryPolicy> exceptionToPolicyMap = new HashMap, RetryPolicy>(); + exceptionToPolicyMap.put(ConnectException.class, retryPolicy); + // TO DO: after HADOOP-9576, IOException can be changed to EOFException + exceptionToPolicyMap.put(IOException.class, retryPolicy); + + return RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL, + exceptionToPolicyMap); + } +}