diff --git hadoop-yarn-project/hadoop-yarn/bin/yarn hadoop-yarn-project/hadoop-yarn/bin/yarn index 9efbd33..f86a7a6 100644 --- hadoop-yarn-project/hadoop-yarn/bin/yarn +++ hadoop-yarn-project/hadoop-yarn/bin/yarn @@ -185,6 +185,9 @@ elif [ "$COMMAND" = "rmadmin" ] ; then elif [ "$COMMAND" = "application" ] ; then CLASS=org.apache.hadoop.yarn.client.cli.ApplicationCLI YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS" +elif [ "$COMMAND" = "history" ] ; then + CLASS=org.apache.hadoop.yarn.client.cli.AHSCLI + YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS" elif [ "$COMMAND" = "node" ] ; then CLASS=org.apache.hadoop.yarn.client.cli.NodeCLI YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS" diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AHSClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AHSClient.java new file mode 100644 index 0000000..e3f7453 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AHSClient.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.client.api.impl.AHSClientImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; + +@InterfaceAudience.Public +@InterfaceStability.Stable +public abstract class AHSClient extends AbstractService { + + /** + * Create a new instance of AHSClient. + */ + @Public + public static AHSClient createAHSClient() { + AHSClient client = new AHSClientImpl(); + return client; + } + + @Private + public AHSClient(String name) { + super(name); + } + + /** + *

+ * Get a report of the given Application. + *

+ * + *

+ * In secure mode, YARN verifies access to the application, queue + * etc. before accepting the request. + *

+ * + *

+ * If the user does not have VIEW_APP access then the following + * fields in the report will be set to stubbed values: + *

+ *

+ * + * @param appId + * {@link ApplicationId} of the application that needs a report + * @return application report + * @throws YarnException + * @throws IOException + */ + public abstract ApplicationReport getApplicationReport(ApplicationId appId) + throws YarnException, IOException; + + /** + *

+ * Get a report (ApplicationReport) of all Applications in the cluster. + *

+ * + *

+ * If the user does not have VIEW_APP access for an application + * then the corresponding report will be filtered as described in + * {@link #getApplicationReport(ApplicationId)}. + *

+ * + * @return a list of reports of all running applications + * @throws YarnException + * @throws IOException + */ + public abstract List getApplications() + throws YarnException, IOException; + + /** + *

+ * Get a report of the given ApplicationAttempt. + *

+ * + *

+ * In secure mode, YARN verifies access to the application, queue + * etc. before accepting the request. + *

+ * + *

+ * If the user does not have VIEW_APP access then the following + * fields in the report will be set to stubbed values: + *

+ *

+ * + * @param applicationAttemptId + * {@link ApplicationId} of the application that needs a report + * @return application report + * @throws YarnException + * @throws IOException + */ + public abstract ApplicationAttemptReport getApplicationAttemptReport( + ApplicationAttemptId appAttemptId) throws YarnException, IOException; + + /** + *

+ * Get a report of all (ApplicationAttempts) of Application in the cluster. + *

+ * + *

+ * If the user does not have VIEW_APP access for an application + * then the corresponding report will be filtered as described in + * {@link #getApplicationReport(ApplicationId)}. + *

+ * + * @param applicationId + * @return a list of reports of all running applications + * @throws YarnException + * @throws IOException + */ + public abstract List getApplicationAttempts( + ApplicationId appId) throws YarnException, IOException; + + /** + *

+ * Get a report of the given Container. + *

+ * + *

+ * In secure mode, YARN verifies access to the application, queue + * etc. before accepting the request. + *

+ * + *

+ * If the user does not have VIEW_APP access then the following + * fields in the report will be set to stubbed values: + *

+ *

+ * + * @param containerId + * {@link ApplicationId} of the application that needs a report + * @return application report + * @throws YarnException + * @throws IOException + */ + public abstract ContainerReport getContainerReport(ContainerId containerId) + throws YarnException, IOException; + + /** + *

+ * Get a report of all (Containers) of ApplicationAttempt in the cluster. + *

+ * + *

+ * If the user does not have VIEW_APP access for an application + * then the corresponding report will be filtered as described in + *

+ * + * @param applicationAttemptId + * @return a list of reports of all running applications + * @throws YarnException + * @throws IOException + */ + public abstract List getContainers( + ApplicationAttemptId applicationAttemptId) throws YarnException, + IOException; +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AHSClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AHSClientImpl.java new file mode 100644 index 0000000..11cc539 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AHSClientImpl.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.client.AHSProxy; +import org.apache.hadoop.yarn.client.api.AHSClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.util.Records; + +@Private +@Unstable +public class AHSClientImpl extends AHSClient { + + private static final Log LOG = LogFactory.getLog(AHSClientImpl.class); + + protected ApplicationHistoryProtocol ahsClient; + protected InetSocketAddress ahsAddress; + protected long statePollIntervalMillis; + + public AHSClientImpl() { + super(AHSClientImpl.class.getName()); + } + + private static InetSocketAddress getAHSAddress(Configuration conf) { + return conf.getSocketAddr(YarnConfiguration.AHS_HISTORY_ADDRESS, + YarnConfiguration.DEFAULT_AHS_ADDRESS, + YarnConfiguration.DEFAULT_AHS_PORT); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.ahsAddress = getAHSAddress(conf); + statePollIntervalMillis = conf.getLong( + YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS, + YarnConfiguration.DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + try { + ahsClient = AHSProxy.createAHSProxy(getConfig(), + ApplicationHistoryProtocol.class, this.ahsAddress); + } catch (IOException e) { + throw new YarnRuntimeException(e); + } + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (this.ahsClient != null) { + RPC.stopProxy(this.ahsClient); + } + super.serviceStop(); + } + + @Override + public ApplicationReport getApplicationReport(ApplicationId appId) + throws YarnException, IOException { + GetApplicationReportRequest request = Records + .newRecord(GetApplicationReportRequest.class); + request.setApplicationId(appId); + GetApplicationReportResponse response = ahsClient + .getApplicationReport(request); + return response.getApplicationReport(); + } + + @Override + public List getApplications() throws YarnException, + IOException { + GetApplicationsRequest request = GetApplicationsRequest.newInstance(null, + null); + GetApplicationsResponse response = ahsClient.getApplications(request); + return response.getApplicationList(); + } + + @Override + public ApplicationAttemptReport getApplicationAttemptReport( + ApplicationAttemptId applicationAttemptId) throws YarnException, + IOException { + GetApplicationAttemptReportRequest request = Records + .newRecord(GetApplicationAttemptReportRequest.class); + request.setApplicationAttemptId(applicationAttemptId); + GetApplicationAttemptReportResponse response = ahsClient + .getApplicationAttemptReport(request); + return response.getApplicationAttemptReport(); + } + + @Override + public List getApplicationAttempts( + ApplicationId appId) throws YarnException, IOException { + GetApplicationAttemptsRequest request = GetApplicationAttemptsRequest + .newInstance(appId); + GetApplicationAttemptsResponse response = ahsClient + .getApplicationAttempts(request); + return response.getApplicationAttemptList(); + } + + @Override + public ContainerReport getContainerReport(ContainerId containerId) + throws YarnException, IOException { + GetContainerReportRequest request = Records + .newRecord(GetContainerReportRequest.class); + request.setContainerId(containerId); + GetContainerReportResponse response = ahsClient.getContainerReport(request); + return response.getContainerReport(); + } + + @Override + public List getContainers( + ApplicationAttemptId applicationAttemptId) throws YarnException, + IOException { + GetContainersRequest request = GetContainersRequest + .newInstance(applicationAttemptId); + GetContainersResponse response = ahsClient.getContainers(request); + return response.getContainerList(); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/AHSCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/AHSCLI.java new file mode 100644 index 0000000..e10e09a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/AHSCLI.java @@ -0,0 +1,382 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.cli; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.io.PrintWriter; +import java.text.DecimalFormat; +import java.util.List; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.MissingArgumentException; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.client.api.AHSClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.ConverterUtils; + +import com.google.common.annotations.VisibleForTesting; + +public class AHSCLI extends Configured implements Tool { + + public static final String STATUS_CMD = "status"; + public static final String APPLICATION = "application"; + public static final String APPLICATION_ATTEMPT = "applicationattempt"; + public static final String CONTAINER = "container"; + public static final String HELP_CMD = "help"; + public static final String LIST_CMD = "list"; + protected PrintStream sysout; + protected PrintStream syserr; + protected AHSClient client; + + public AHSCLI() { + super(new YarnConfiguration()); + client = AHSClient.createAHSClient(); + client.init(getConf()); + client.start(); + } + + private static final String APPLICATIONS_PATTERN = "%30s\t%20s\t%20s\t%10s\t%10s\t%18s\t%18s\t%15s\t%35s" + + System.getProperty("line.separator"); + + public static void main(String[] args) throws Exception { + AHSCLI cli = new AHSCLI(); + cli.setSysOutPrintStream(System.out); + cli.setSysErrPrintStream(System.err); + int res = ToolRunner.run(cli, args); + cli.stop(); + System.exit(res); + } + + public void setSysOutPrintStream(PrintStream sysout) { + this.sysout = sysout; + } + + public void setSysErrPrintStream(PrintStream syserr) { + this.syserr = syserr; + } + + public AHSClient getClient() { + return client; + } + + public void setClient(AHSClient client) { + this.client = client; + } + + public void stop() { + this.client.stop(); + } + + @Override + public int run(String[] args) throws Exception { + Options opts = new Options(); + opts.addOption(STATUS_CMD, true, "Prints the status of the application."); + opts.getOption(STATUS_CMD).setArgName("Application ID"); + opts.addOption(LIST_CMD, false, "List applications from the AHS. "); + int exitCode = -1; + CommandLine cliParser = null; + try { + cliParser = new GnuParser().parse(opts, args); + } catch (MissingArgumentException ex) { + sysout.println("Missing argument for options"); + printUsage(opts); + return exitCode; + } + + Option[] opt = cliParser.getOptions(); + for (int i = 0; i < opt.length; i++) { + sysout.println(opt[i].toString()); + } + if (cliParser.hasOption(STATUS_CMD) + && args[0].equalsIgnoreCase(APPLICATION)) { + if (args.length != 3) { + printUsage(opts); + return exitCode; + } + printApplicationReport(cliParser.getOptionValue(STATUS_CMD)); + } else if (cliParser.hasOption(LIST_CMD) + && args[0].equalsIgnoreCase(APPLICATION)) { + if (args.length != 2) { + printUsage(opts); + return exitCode; + } + listApplications(); + } else if (cliParser.hasOption(STATUS_CMD) + && args[0].equalsIgnoreCase(APPLICATION_ATTEMPT)) { + if (args.length != 3) { + printUsage(opts); + return exitCode; + } + printApplicationAttemptReport(cliParser.getOptionValue(STATUS_CMD)); + } else if (cliParser.hasOption(LIST_CMD) + && args[0].equalsIgnoreCase(APPLICATION_ATTEMPT)) { + if (args.length != 3) { + printUsage(opts); + return exitCode; + } + listApplicationAttempts(cliParser.getOptionValue(LIST_CMD)); + } else if (cliParser.hasOption(STATUS_CMD) + && args[0].equalsIgnoreCase(CONTAINER)) { + if (args.length != 3) { + printUsage(opts); + return exitCode; + } + printContainerReport(cliParser.getOptionValue(STATUS_CMD)); + } else if (cliParser.hasOption(LIST_CMD) + && args[0].equalsIgnoreCase(CONTAINER)) { + if (args.length != 3) { + printUsage(opts); + return exitCode; + } + listContainers(cliParser.getOptionValue(LIST_CMD)); + } + return exitCode; + } + + /** + * Prints the application report for an application id. + * + * @param applicationId + * @throws YarnException + */ + private void printApplicationReport(String applicationId) + throws YarnException, IOException { + ApplicationReport appReport = client.getApplicationReport(ConverterUtils + .toApplicationId(applicationId)); + // Use PrintWriter.println, which uses correct platform line ending. + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintWriter appReportStr = new PrintWriter(baos); + if (appReport != null) { + appReportStr.println("Application Report : "); + appReportStr.print("\tApplication-Id : "); + appReportStr.println(appReport.getApplicationId()); + appReportStr.print("\tApplication-Name : "); + appReportStr.println(appReport.getName()); + appReportStr.print("\tApplication-Type : "); + appReportStr.println(appReport.getApplicationType()); + appReportStr.print("\tUser : "); + appReportStr.println(appReport.getUser()); + appReportStr.print("\tQueue : "); + appReportStr.println(appReport.getQueue()); + appReportStr.print("\tStart-Time : "); + appReportStr.println(appReport.getStartTime()); + appReportStr.print("\tFinish-Time : "); + appReportStr.println(appReport.getFinishTime()); + appReportStr.print("\tProgress : "); + DecimalFormat formatter = new DecimalFormat("###.##%"); + String progress = formatter.format(appReport.getProgress()); + appReportStr.println(progress); + appReportStr.print("\tState : "); + appReportStr.println(appReport.getYarnApplicationState()); + appReportStr.print("\tFinal-State : "); + appReportStr.println(appReport.getFinalApplicationStatus()); + appReportStr.print("\tTracking-URL : "); + appReportStr.println(appReport.getOriginalTrackingUrl()); + appReportStr.print("\tRPC Port : "); + appReportStr.println(appReport.getRpcPort()); + appReportStr.print("\tAM Host : "); + appReportStr.println(appReport.getHost()); + appReportStr.print("\tDiagnostics : "); + appReportStr.print(appReport.getDiagnostics()); + } else { + appReportStr.print("Application with id '" + applicationId + + "' doesn't exist in RM."); + } + appReportStr.close(); + sysout.println(baos.toString("UTF-8")); + } + + /** + * Prints the application report for an application id. + * + * @param applicationId + * @throws YarnException + */ + private void printApplicationAttemptReport(String applicationAttemptId) + throws YarnException, IOException { + ApplicationAttemptReport appAttemptReport = client + .getApplicationAttemptReport(ConverterUtils + .toApplicationAttemptId(applicationAttemptId)); + // Use PrintWriter.println, which uses correct platform line ending. + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintWriter appReportStr = new PrintWriter(baos); + if (appAttemptReport != null) { + appReportStr.println("Application Attempt Report : "); + appReportStr.print("\tApplicationAttempt-Id : "); + appReportStr.println(appAttemptReport.getApplicationAttemptId()); + appReportStr.print("\tState : "); + appReportStr.println(appAttemptReport.getYarnApplicationAttemptState()); + appReportStr.print("\tAMContainer : "); + appReportStr.println(appAttemptReport.getAMContainerId().toString()); + appReportStr.print("\tTracking-URL : "); + appReportStr.println(appAttemptReport.getTrackingUrl()); + appReportStr.print("\tRPC Port : "); + appReportStr.println(appAttemptReport.getRpcPort()); + appReportStr.print("\tAM Host : "); + appReportStr.println(appAttemptReport.getHost()); + appReportStr.print("\tDiagnostics : "); + appReportStr.print(appAttemptReport.getDiagnostics()); + } else { + appReportStr.print("Application Attempt with id '" + applicationAttemptId + + "' doesn't exist in History Server."); + } + appReportStr.close(); + sysout.println(baos.toString("UTF-8")); + } + + /** + * Prints the container report for an application attempt id. + * + * @param applicationAttemptId + * @throws YarnException + */ + private void printContainerReport(String containerId) throws YarnException, + IOException { + ContainerReport containerReport = client.getContainerReport((ConverterUtils + .toContainerId(containerId))); + // Use PrintWriter.println, which uses correct platform line ending. + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintWriter appReportStr = new PrintWriter(baos); + if (containerReport != null) { + appReportStr.println("Container Report : "); + appReportStr.print("\tContainer-Id : "); + appReportStr.println(containerReport.getContainerId()); + appReportStr.print("\tStart-Time : "); + appReportStr.println(containerReport.getStartTime()); + appReportStr.print("\tFinish-Time : "); + appReportStr.println(containerReport.getFinishTime()); + appReportStr.print("\tState : "); + appReportStr.println(containerReport.getContainerStatus().toString()); + appReportStr.print("\tLOG-URL : "); + appReportStr.println(containerReport.getLogURL()); + appReportStr.print("\tHost : "); + appReportStr.println(containerReport.getAssignedNode()); + appReportStr.print("\tDiagnostics : "); + appReportStr.print(containerReport.getDiagnosticsInfo()); + } else { + appReportStr.print("Container with id '" + containerId + + "' doesn't exist in Hostory Server."); + } + appReportStr.close(); + sysout.println(baos.toString("UTF-8")); + } + + /** + * It prints the usage of the command + * + * @param opts + */ + @VisibleForTesting + void printUsage(Options opts) { + new HelpFormatter().printHelp("application", opts); + } + + /** + * Lists the applications matching the given application Types And application + * States present in the Resource Manager + * + * @param appTypes + * @param appStates + * @throws YarnException + * @throws IOException + */ + private void listApplications() throws YarnException, IOException { + PrintWriter writer = new PrintWriter(sysout); + + List appsReport = client.getApplications(); + writer.println("Total number of applications " + ":" + appsReport.size()); + writer.printf(APPLICATIONS_PATTERN, "Application-Id", "Application-Name", + "Application-Type", "User", "Queue", "State", "Final-State", + "Progress", "Tracking-URL"); + for (ApplicationReport appReport : appsReport) { + DecimalFormat formatter = new DecimalFormat("###.##%"); + String progress = formatter.format(appReport.getProgress()); + writer.printf(APPLICATIONS_PATTERN, appReport.getApplicationId(), + appReport.getName(), appReport.getApplicationType(), + appReport.getUser(), appReport.getQueue(), + appReport.getYarnApplicationState(), + appReport.getFinalApplicationStatus(), progress, + appReport.getOriginalTrackingUrl()); + } + writer.flush(); + } + + /** + * Lists the application attempts matching the given applicationid + * + * @param applicationId + * @throws YarnException + * @throws IOException + */ + private void listApplicationAttempts(String appId) throws YarnException, + IOException { + PrintWriter writer = new PrintWriter(sysout); + + List appAttemptsReport = client + .getApplicationAttempts(ConverterUtils.toApplicationId(appId)); + writer.println("Total number of application attempts " + ":" + + appAttemptsReport.size()); + writer.printf(APPLICATIONS_PATTERN, "ApplicationAttempt-Id", "State", + "AM-Container-Id", "Tracking-URL"); + for (ApplicationAttemptReport appAttemptReport : appAttemptsReport) { + writer.printf(APPLICATIONS_PATTERN, appAttemptReport + .getApplicationAttemptId(), appAttemptReport + .getYarnApplicationAttemptState(), appAttemptReport + .getAMContainerId().toString(), appAttemptReport.getTrackingUrl()); + } + writer.flush(); + } + + /** + * Lists the containers matching the given application attempts + * + * @param appAttemptId + * @throws YarnException + * @throws IOException + */ + private void listContainers(String appAttemptId) throws YarnException, + IOException { + PrintWriter writer = new PrintWriter(sysout); + + List appsReport = client.getContainers(ConverterUtils + .toApplicationAttemptId(appAttemptId)); + writer.println("Total number of containers " + ":" + appsReport.size()); + writer.printf(APPLICATIONS_PATTERN, "Container-Id", "Start Time", + "Finish Time", "State", "Host", "LOG-URL"); + for (ContainerReport containerReport : appsReport) { + writer.printf(APPLICATIONS_PATTERN, containerReport.getContainerId(), + containerReport.getStartTime(), containerReport.getFinishTime(), + containerReport.getContainerStatus().getState().toString(), + containerReport.getAssignedNode(), containerReport.getLogURL()); + } + writer.flush(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationHistoryProtocolPBClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationHistoryProtocolPBClientImpl.java new file mode 100644 index 0000000..a11fda8 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationHistoryProtocolPBClientImpl.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.impl.pb.client; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.yarn.api.ApplicationClientProtocolPB; +import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol; +import org.apache.hadoop.yarn.api.ApplicationHistoryProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationAttemptReportRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationAttemptReportResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationAttemptsRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationAttemptsResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationReportRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationReportResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationsRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationsResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerReportRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerReportResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainersRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainersResponsePBImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptReportRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerReportRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainersRequestProto; + +import com.google.protobuf.ServiceException; + +public class ApplicationHistoryProtocolPBClientImpl implements + ApplicationHistoryProtocol, Closeable { + + private ApplicationHistoryProtocolPB proxy; + + public ApplicationHistoryProtocolPBClientImpl(long clientVersion, + InetSocketAddress addr, Configuration conf) throws IOException { + RPC.setProtocolEngine(conf, ApplicationHistoryProtocolPB.class, + ProtobufRpcEngine.class); + proxy = RPC.getProxy(ApplicationHistoryProtocolPB.class, clientVersion, + addr, conf); + } + + @Override + public void close() throws IOException { + if (this.proxy != null) { + RPC.stopProxy(this.proxy); + } + } + + @Override + public GetApplicationReportResponse getApplicationReport( + GetApplicationReportRequest request) throws YarnException, IOException { + GetApplicationReportRequestProto requestProto = ((GetApplicationReportRequestPBImpl) request) + .getProto(); + try { + return new GetApplicationReportResponsePBImpl(proxy.getApplicationReport( + null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public GetApplicationsResponse getApplications(GetApplicationsRequest request) + throws YarnException, IOException { + GetApplicationsRequestProto requestProto = ((GetApplicationsRequestPBImpl) request) + .getProto(); + try { + return new GetApplicationsResponsePBImpl(proxy.getApplications(null, + requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public GetApplicationAttemptReportResponse getApplicationAttemptReport( + GetApplicationAttemptReportRequest request) throws YarnException, + IOException { + GetApplicationAttemptReportRequestProto requestProto = ((GetApplicationAttemptReportRequestPBImpl) request) + .getProto(); + try { + return new GetApplicationAttemptReportResponsePBImpl( + proxy.getApplicationAttemptReport(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public GetApplicationAttemptsResponse getApplicationAttempts( + GetApplicationAttemptsRequest request) throws YarnException, IOException { + GetApplicationAttemptsRequestProto requestProto = ((GetApplicationAttemptsRequestPBImpl) request) + .getProto(); + try { + return new GetApplicationAttemptsResponsePBImpl( + proxy.getApplicationAttempts(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public GetContainerReportResponse getContainerReport( + GetContainerReportRequest request) throws YarnException, IOException { + GetContainerReportRequestProto requestProto = ((GetContainerReportRequestPBImpl) request) + .getProto(); + try { + return new GetContainerReportResponsePBImpl(proxy.getContainerReport( + null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public GetContainersResponse getContainers(GetContainersRequest request) + throws YarnException, IOException { + GetContainersRequestProto requestProto = ((GetContainersRequestPBImpl) request) + .getProto(); + try { + return new GetContainersResponsePBImpl(proxy.getContainers(null, + requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public GetDelegationTokenResponse getDelegationToken( + GetDelegationTokenRequest request) throws YarnException, IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public RenewDelegationTokenResponse renewDelegationToken( + RenewDelegationTokenRequest request) throws YarnException, IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public CancelDelegationTokenResponse cancelDelegationToken( + CancelDelegationTokenRequest request) throws YarnException, IOException { + // TODO Auto-generated method stub + return null; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AHSProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AHSProxy.java new file mode 100644 index 0000000..daf86d4 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AHSProxy.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.security.PrivilegedAction; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.ipc.YarnRPC; + +import com.google.common.annotations.VisibleForTesting; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +@SuppressWarnings("unchecked") +public class AHSProxy { + + private static final Log LOG = LogFactory.getLog(AHSProxy.class); + + public static T createAHSProxy(final Configuration conf, + final Class protocol, InetSocketAddress ahsAddress) throws IOException { + RetryPolicy retryPolicy = createRetryPolicy(conf); + T proxy = AHSProxy. getProxy(conf, protocol, ahsAddress); + LOG.info("Connecting to Application History server at " + ahsAddress); + return (T) RetryProxy.create(protocol, proxy, retryPolicy); + } + + private static T getProxy(final Configuration conf, + final Class protocol, final InetSocketAddress ahsAddress) + throws IOException { + return UserGroupInformation.getCurrentUser().doAs( + new PrivilegedAction() { + + @Override + public T run() { + return (T) YarnRPC.create(conf) + .getProxy(protocol, ahsAddress, conf); + } + }); + } + + @Private + @VisibleForTesting + public static RetryPolicy createRetryPolicy(Configuration conf) { + long rmConnectWaitMS = conf.getInt( + YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, + YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS); + long rmConnectionRetryIntervalMS = conf.getLong( + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS); + + if (rmConnectionRetryIntervalMS < 0) { + throw new YarnRuntimeException("Invalid Configuration. " + + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS + + " should not be negative."); + } + + boolean waitForEver = (rmConnectWaitMS == -1); + + if (waitForEver) { + return RetryPolicies.RETRY_FOREVER; + } else { + if (rmConnectWaitMS < 0) { + throw new YarnRuntimeException("Invalid Configuration. " + + YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS + + " can be -1, but can not be other negative numbers"); + } + + // try connect once + if (rmConnectWaitMS < rmConnectionRetryIntervalMS) { + LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS + + " is smaller than " + + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS + + ". Only try connect once."); + rmConnectWaitMS = 0; + } + } + + RetryPolicy retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep( + rmConnectWaitMS, rmConnectionRetryIntervalMS, TimeUnit.MILLISECONDS); + + Map, RetryPolicy> exceptionToPolicyMap = new HashMap, RetryPolicy>(); + exceptionToPolicyMap.put(ConnectException.class, retryPolicy); + // TO DO: after HADOOP-9576, IOException can be changed to EOFException + exceptionToPolicyMap.put(IOException.class, retryPolicy); + + return RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL, + exceptionToPolicyMap); + } +}