diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java index 6cdf918..b76c65a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java @@ -25,8 +25,10 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; import java.util.regex.Pattern; @@ -278,9 +280,7 @@ private int runCommand(String[] args) throws Exception { if (nodeAddress == null) { resultCode = fetchApplicationLogs(request, logCliHelper); } else { - System.err.println("Should at least provide ContainerId!"); - printHelpMessage(printOpts); - resultCode = -1; + resultCode = fetchContainerLogsFromSpecificNM(request, logCliHelper); } } return resultCode; @@ -557,6 +557,9 @@ private int printAMContainerLogs(Configuration conf, boolean getAMContainerLists = false; String appId = request.getAppId().toString(); String errorMessage = ""; + // We will call RM webservice to get all AppAttempts information. + // If we get nothing, we will try to call AHS webservice to get AppAttempts + // which includes nodeAddress for the AM Containers. try { amContainersList = getAMContainerInfoForRMWebService(conf, appId); if (amContainersList != null && !amContainersList.isEmpty()) { @@ -596,6 +599,9 @@ private int printAMContainerLogs(Configuration conf, System.err.println("Unable to get AM container informations " + "for the application:" + appId); System.err.println(errorMessage); + System.err.println("Can not get AMContainers logs for " + + "the application:" + appId + " with the appOwner:" + + request.getAppOwner()); return -1; } @@ -636,19 +642,12 @@ private void outputAMContainerLogs(ContainerLogsRequest request, if (request.isAppFinished()) { if (containerId != null && !containerId.isEmpty()) { - if (nodeId == null || nodeId.isEmpty()) { - try { - nodeId = - getContainerReport(containerId).getAssignedNode().toString(); - request.setNodeId(nodeId); - } catch (Exception ex) { - System.err.println(ex); - nodeId = null; - } - } if (nodeId != null && !nodeId.isEmpty()) { printContainerLogsForFinishedApplication(request, logCliHelper); + } else { + printContainerLogsForFinishedApplicationWithoutNodeId( + request, logCliHelper); } } } else { @@ -827,38 +826,8 @@ private int fetchAMContainerLogs(ContainerLogsRequest request, List amContainersList, LogCLIHelpers logCliHelper) throws Exception { - // If the application is running, we will call the RM WebService - // to get the AppAttempts which includes the nodeHttpAddress - // and containerId for all the AM Containers. - // After that, we will call NodeManager webService to get the - // related logs - if (!request.isAppFinished()) { - return printAMContainerLogs(getConf(), request, amContainersList, - logCliHelper); - } else { - // If the application is in the final state, we will call RM webservice - // to get all AppAttempts information first. If we get nothing, - // we will try to call AHS webservice to get related AppAttempts - // which includes nodeAddress for the AM Containers. - // After that, we will use nodeAddress and containerId - // to get logs from HDFS directly. - if (getConf().getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, - YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) { - return printAMContainerLogs(getConf(), request, amContainersList, - logCliHelper); - } else { - ApplicationId appId = request.getAppId(); - String appOwner = request.getAppOwner(); - System.err.println("Can not get AMContainers logs for " - + "the application:" + appId + " with the appOwner:" + appOwner); - System.err.println("This application:" + appId + " has finished." - + " Please enable the application-history service or explicitly" - + " use 'yarn logs -applicationId " - + "-containerId --nodeAddress ' " - + "to get the container logs."); - return -1; - } - } + return printAMContainerLogs(getConf(), request, amContainersList, + logCliHelper); } private int fetchContainerLogs(ContainerLogsRequest request, @@ -869,13 +838,18 @@ private int fetchContainerLogs(ContainerLogsRequest request, String nodeAddress = request.getNodeId(); String appOwner = request.getAppOwner(); boolean isAppFinished = request.isAppFinished(); - // if we provide the node address and the application is in the final - // state, we could directly get logs from HDFS. - if (nodeAddress != null && isAppFinished) { + // if application is in the final state, + // we could directly get logs from HDFS. + if (isAppFinished) { // if user specified "ALL" as the logFiles param, pass empty list // to logCliHelper so that it fetches all the logs - return printContainerLogsForFinishedApplication( - request, logCliHelper); + if (nodeAddress != null && !nodeAddress.isEmpty()) { + return printContainerLogsForFinishedApplication( + request, logCliHelper); + } else { + return printContainerLogsForFinishedApplicationWithoutNodeId( + request, logCliHelper); + } } String nodeHttpAddress = null; String nodeId = null; @@ -1159,4 +1133,92 @@ public void setFileLength(String fileLength) { this.fileLength = fileLength; } } + + private int fetchContainerLogsFromSpecificNM(ContainerLogsRequest request, + LogCLIHelpers logCliHelper) throws Exception { + int resultCode = -1; + if (request.isAppFinished()) { + ContainerLogsRequest newOptions = getMatchedLogOptions( + request, logCliHelper); + if (newOptions == null) { + System.err.println("Can not find any log file matching the pattern: " + + request.getLogTypes() + " for the application: " + + request.getAppId()); + return -1; + } else { + resultCode = logCliHelper.dumpAllContainerLogsForASpecificNodeId( + newOptions); + } + } else { + ApplicationId appId = request.getAppId(); + String nodeId = request.getNodeId(); + Configuration conf = getConf(); + String nodeHttpAddress = null; + Client webServiceClient = Client.create(); + String rmWebAppAddress = WebAppUtils.getRMWebAppURLWithScheme(conf); + + // Call RMWebService to get NodeManager List + WebResource webResource = webServiceClient.resource(rmWebAppAddress); + ClientResponse response = + webResource.path("ws").path("v1").path("cluster").path("nodes") + .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); + JSONObject json = + response.getEntity(JSONObject.class).getJSONObject("nodes"); + JSONArray nodesJson = json.getJSONArray("node"); + for (int i = 0; i < nodesJson.length(); i++) { + if (nodesJson.getJSONObject(i).getString("id").equals(nodeId)) { + nodeHttpAddress = nodesJson.getJSONObject(i).getString("nodeHTTPAddress"); + break; + } + } + if (nodeHttpAddress == null) { + System.err.println("The NodeManager with the nodeId:" + nodeId + + " does not exist."); + return -1; + } + request.setNodeHttpAddress(nodeHttpAddress.replaceFirst( + WebAppUtils.getHttpSchemePrefix(getConf()), "")); + + List attemptsList = yarnClient + .getApplicationAttempts(appId); + if (attemptsList.isEmpty()) { + System.err.println("Can not find any available attempts " + + "in this application:" + appId); + return -1; + } + + List containerReports = + new ArrayList(); + for (ApplicationAttemptReport attempt : attemptsList) { + try { + List reports = yarnClient.getContainers( + attempt.getApplicationAttemptId()); + // filter the containers based on the nodeId + for (ContainerReport report : reports) { + if (report.getAssignedNode().toString().equals(nodeId)) { + containerReports.add(report); + } + } + } catch (Exception ex) { + // DO NOTHING + } + } + if (containerReports.isEmpty()) { + System.err.println("Can not find any containers " + + "which ran on the NodeManager:" + nodeId); + return -1; + } + for (ContainerReport report : containerReports) { + ContainerLogsRequest container = new ContainerLogsRequest(request); + container.setContainerId(report.getContainerId().toString()); + container.setContainerState(report.getContainerState()); + int result = printContainerLogsFromRunningApplication(getConf(), + container, logCliHelper); + if (result == 0) { + resultCode = 0; + } + } + } + return resultCode; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java index 329d856..1793d2d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java @@ -528,6 +528,103 @@ public ContainerReport getContainerReport(String containerIdStr) } @Test (timeout = 5000) + public void testFetchContainerLogsFromASpecificNode() throws Exception { + String remoteLogRootDir = "target/logs/"; + Configuration configuration = new Configuration(); + configuration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + configuration + .set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogRootDir); + configuration.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); + configuration.set(YarnConfiguration.YARN_ADMIN_ACL, "admin"); + FileSystem fs = FileSystem.get(configuration); + + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1); + ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2); + ContainerId containerId3 = ContainerId.newContainerId(appAttemptId, 3); + final NodeId nodeId = NodeId.newInstance("localhost", 1234); + final NodeId nodeId2 = NodeId.newInstance("testhost", 2345); + String rootLogDir = "target/LocalLogs"; + Path rootLogDirPath = new Path(rootLogDir); + try { + // create local logs + if (fs.exists(rootLogDirPath)) { + fs.delete(rootLogDirPath, true); + } + assertTrue(fs.mkdirs(rootLogDirPath)); + + Path appLogsDir = new Path(rootLogDirPath, appId.toString()); + if (fs.exists(appLogsDir)) { + fs.delete(appLogsDir, true); + } + assertTrue(fs.mkdirs(appLogsDir)); + + List rootLogDirs = Arrays.asList(rootLogDir); + + List logTypes = new ArrayList(); + logTypes.add("syslog"); + // create container logs in localLogDir + createContainerLogInLocalDir(appLogsDir, containerId1, fs, logTypes); + createContainerLogInLocalDir(appLogsDir, containerId2, fs, logTypes); + + // create two logs for container3 in localLogDir + logTypes.add("stdout"); + createContainerLogInLocalDir(appLogsDir, containerId3, fs, logTypes); + + Path path = new Path(remoteLogRootDir + ugi.getShortUserName() + + "/logs/application_0_0001"); + if (fs.exists(path)) { + fs.delete(path, true); + } + assertTrue(fs.mkdirs(path)); + + // upload container logs into remote directory + // the first two logs is empty. When we try to read first two logs, + // we will meet EOF exception, but it will not impact other logs. + // Other logs should be read successfully. + uploadContainerLogIntoRemoteDir(ugi, configuration, rootLogDirs, nodeId, + containerId1, path, fs); + uploadContainerLogIntoRemoteDir(ugi, configuration, rootLogDirs, nodeId, + containerId2, path, fs); + uploadContainerLogIntoRemoteDir(ugi, configuration, rootLogDirs, nodeId2, + containerId3, path, fs); + + YarnClient mockYarnClient = createMockYarnClient( + YarnApplicationState.FINISHED, ugi.getShortUserName()); + LogsCLI cli = new LogsCLIForTest(mockYarnClient); + cli.setConf(configuration); + + int exitCode = cli.run(new String[] { "-applicationId", appId.toString(), + "-nodeAddress", nodeId.toString()}); + assertTrue(exitCode == 0); + assertTrue(sysOutStream.toString().contains( + "Hello container_0_0001_01_000001 in syslog!")); + assertTrue(sysOutStream.toString().contains( + "Hello container_0_0001_01_000002 in syslog!")); + assertTrue(!sysOutStream.toString().contains( + "Hello container_0_0001_01_000003 in syslog!")); + sysOutStream.reset(); + + exitCode = cli.run(new String[] { "-applicationId", appId.toString(), + "-nodeAddress", nodeId2.toString()}); + assertTrue(exitCode == 0); + assertTrue(!sysOutStream.toString().contains( + "Hello container_0_0001_01_000001 in syslog!")); + assertTrue(sysOutStream.toString().contains( + "Hello container_0_0001_01_000003 in syslog!")); + assertTrue(sysOutStream.toString().contains( + "Hello container_0_0001_01_000003 in stdout!")); + sysOutStream.reset(); + } finally { + fs.delete(new Path(remoteLogRootDir), true); + fs.delete(new Path(rootLogDir), true); + } + } + + @Test (timeout = 5000) public void testFetchRunningApplicationLogs() throws Exception { UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java index 13934d2..447eddb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java @@ -268,6 +268,80 @@ public int dumpAContainerLogsForLogTypeWithoutNodeId( } @Private + public int dumpAllContainerLogsForASpecificNodeId( + ContainerLogsRequest options) throws IOException { + ApplicationId appId = options.getAppId(); + String appOwner = options.getAppOwner(); + String localDir = options.getOutputLocalDir(); + List logTypes = options.getLogTypes(); + String nodeId = options.getNodeId(); + RemoteIterator nodeFiles = getRemoteNodeFileDir( + appId, appOwner); + if (nodeFiles == null) { + return -1; + } + boolean foundAnyLogs = false; + while (nodeFiles.hasNext()) { + FileStatus thisNodeFile = nodeFiles.next(); + String fileName = thisNodeFile.getPath().getName(); + if (fileName.contains(LogAggregationUtils.getNodeString(nodeId)) && + !fileName.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { + AggregatedLogFormat.LogReader reader = + new AggregatedLogFormat.LogReader(getConf(), + thisNodeFile.getPath()); + try { + DataInputStream valueStream; + LogKey key = new LogKey(); + valueStream = reader.next(key); + + while (valueStream != null) { + PrintStream out = createPrintStream(localDir, + thisNodeFile.getPath().getName(), key.toString()); + try { + String containerString = String.format( + CONTAINER_ON_NODE_PATTERN, key, + thisNodeFile.getPath().getName()); + out.println(containerString); + out.println(StringUtils.repeat("=", containerString.length())); + while (true) { + try { + if (logTypes == null || logTypes.isEmpty()) { + LogReader.readAContainerLogsForALogType(valueStream, out, + thisNodeFile.getModificationTime(), + options.getBytes()); + foundAnyLogs = true; + } else { + int result = LogReader.readContainerLogsForALogType( + valueStream, out, thisNodeFile.getModificationTime(), + logTypes, options.getBytes()); + if (result == 0) { + foundAnyLogs = true; + } + } + } catch (EOFException eof) { + break; + } + } + } finally { + closePrintStream(out); + } + // Next container + key = new LogKey(); + valueStream = reader.next(key); + } + } finally { + reader.close(); + } + } + } + if (!foundAnyLogs) { + emptyLogDir(getRemoteAppLogDir(appId, appOwner).toString()); + return -1; + } + return 0; + } + + @Private public int dumpAContainerLogs(String containerIdStr, AggregatedLogFormat.LogReader reader, PrintStream out, long logUploadedTime, long bytes) throws IOException {