diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java index 6cdf918..c279db8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java @@ -25,8 +25,10 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; import java.util.regex.Pattern; @@ -278,9 +280,7 @@ private int runCommand(String[] args) throws Exception { if (nodeAddress == null) { resultCode = fetchApplicationLogs(request, logCliHelper); } else { - System.err.println("Should at least provide ContainerId!"); - printHelpMessage(printOpts); - resultCode = -1; + resultCode = fetchContainerLogsFromSpecificNM(request, logCliHelper); } } return resultCode; @@ -1159,4 +1159,149 @@ public void setFileLength(String fileLength) { this.fileLength = fileLength; } } + + private int fetchContainerLogsFromSpecificNM(ContainerLogsRequest request, + LogCLIHelpers logCliHelper) throws ClientHandlerException, + UniformInterfaceException, JSONException, IOException { + int resultCode = -1; + if (request.isAppFinished()) { + resultCode = logCliHelper.dumpAllContainerLogsForASpecificNodeId( + request); + } else { + String appId = request.getAppId().toString(); + String nodeId = request.getNodeId(); + Configuration conf = getConf(); + String nodeHttpAddress = null; + Client webServiceClient = Client.create(); + String rmWebAppAddress = WebAppUtils.getRMWebAppURLWithScheme(conf); + String ahsWebAppAddress = WebAppUtils.getHttpSchemePrefix(conf) + + WebAppUtils.getAHSWebAppURLWithoutScheme(conf); + + // Call RMWebService to get NodeManager List + WebResource webResource = webServiceClient.resource(rmWebAppAddress); + ClientResponse response = + webResource.path("ws").path("v1").path("cluster").path("nodes") + .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); + JSONObject json = + response.getEntity(JSONObject.class).getJSONObject("nodes"); + JSONArray nodesJson = json.getJSONArray("node"); + for (int i = 0; i < nodesJson.length(); i++) { + if (nodesJson.getJSONObject(i).getString("id").equals(nodeId)) { + nodeHttpAddress = nodesJson.getJSONObject(i).getString("nodeHTTPAddress"); + break; + } + } + if (nodeHttpAddress == null) { + System.err.println("The NodeManager with the nodeId:" + nodeId + + " does not exist."); + return -1; + } + request.setNodeHttpAddress(nodeHttpAddress.replaceFirst( + WebAppUtils.getHttpSchemePrefix(getConf()), "")); + + // Call RMWebService to get AppAttempt List + response = webResource.path("ws").path("v1").path("cluster").path("apps") + .path(appId).path("appattempts").accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + JSONObject attemptsJson = + response.getEntity(JSONObject.class).getJSONObject("appAttempts"); + JSONArray attemptsArray = attemptsJson.getJSONArray("appAttempt"); + List attemptsList = new ArrayList(); + for (int i = 0; i < attemptsArray.length(); i++) { + attemptsList.add(attemptsArray.getJSONObject(i) + .getString("appAttemptId")); + } + if (attemptsList.isEmpty()) { + System.err.println("Can not find any available attempts " + + "in this application:" + appId); + return -1; + } + // call RMWebService And AHSWebService to get all the containers + // for this application, and filter the list with specific nodeId + Map containersMap = new TreeMap< + String, ContainerState>(); + for (String attemptId : attemptsList) { + // call RMWebService + collectContainerInfo(containersMap, filterContainersBasedOnNodeId( + rmWebAppAddress, appId, attemptId, nodeId, true)); + // call AHSWebService + collectContainerInfo(containersMap, filterContainersBasedOnNodeId( + ahsWebAppAddress, appId, attemptId, nodeId, false)); + } + if (containersMap.isEmpty()) { + System.err.println("Can not find any containers " + + "which ran on the NodeManager:" + nodeId); + return -1; + } + for (Entry containerInfo : + containersMap.entrySet()) { + ContainerLogsRequest container = new ContainerLogsRequest(request); + container.setContainerId(containerInfo.getKey()); + container.setContainerState(containerInfo.getValue()); + int result = printContainerLogsFromRunningApplication(getConf(), + container, logCliHelper); + if (result == 0) { + resultCode = 0; + } + } + } + return resultCode; + } + + private Map filterContainersBasedOnNodeId( + String webAppAddress, String appId, String appAttemptId, + String nodeId, boolean rmWebService) { + Map containersMap = new HashMap< + String, ContainerState>(); + try { + JSONArray containers = getContainerListFromWebService( + webAppAddress, appId, appAttemptId, rmWebService); + for (int i = 0; i < containers.length(); i++) { + JSONObject containerJSON = containers.getJSONObject(i); + if (containerJSON.getString("assignedNodeId").equals(nodeId)) { + String containerId = containerJSON.getString("containerId"); + String containerState = null; + if (containerJSON.has("containerState")) { + containerState = containerJSON.getString("containerState"); + } + containersMap.put(containerId, containerState == null ? null : + ContainerState.valueOf(containerState)); + } + } + } catch (Exception ex) { + System.out.println(ex.getMessage()); + } + return containersMap; + } + + private JSONArray getContainerListFromWebService(String webAppAddress, + String appId, String appAttemptId, + boolean rmWebService) throws ClientHandlerException, + UniformInterfaceException, JSONException { + String key = rmWebService ? "cluster" : "applicationhistory"; + Client webServiceClient = Client.create(); + WebResource webResource = webServiceClient.resource(webAppAddress); + ClientResponse response = webResource.path("ws").path("v1") + .path(key).path("apps").path(appId).path("appattempts") + .path(appAttemptId).path("containers") + .accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + return response.getEntity(JSONObject.class).getJSONArray("container"); + } + + private void collectContainerInfo(Map containersMap, + Map filteredMap) { + for (Entry container : filteredMap.entrySet()) { + String containerId = container.getKey(); + ContainerState containerState = container.getValue(); + if (containersMap.containsKey(containerId)) { + if (containerState != null && containersMap.get(containerId) + != ContainerState.COMPLETE) { + containersMap.put(containerId, containerState); + } + } else { + containersMap.put(containerId, containerState); + } + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java index bda489f..32cfe6c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java @@ -528,6 +528,103 @@ public ContainerReport getContainerReport(String containerIdStr) } @Test (timeout = 5000) + public void testFetchContainerLogsFromASpecificNode() throws Exception { + String remoteLogRootDir = "target/logs/"; + Configuration configuration = new Configuration(); + configuration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + configuration + .set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogRootDir); + configuration.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); + configuration.set(YarnConfiguration.YARN_ADMIN_ACL, "admin"); + FileSystem fs = FileSystem.get(configuration); + + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1); + ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2); + ContainerId containerId3 = ContainerId.newContainerId(appAttemptId, 3); + final NodeId nodeId = NodeId.newInstance("localhost", 1234); + final NodeId nodeId2 = NodeId.newInstance("testhost", 2345); + String rootLogDir = "target/LocalLogs"; + Path rootLogDirPath = new Path(rootLogDir); + try { + // create local logs + if (fs.exists(rootLogDirPath)) { + fs.delete(rootLogDirPath, true); + } + assertTrue(fs.mkdirs(rootLogDirPath)); + + Path appLogsDir = new Path(rootLogDirPath, appId.toString()); + if (fs.exists(appLogsDir)) { + fs.delete(appLogsDir, true); + } + assertTrue(fs.mkdirs(appLogsDir)); + + List rootLogDirs = Arrays.asList(rootLogDir); + + List logTypes = new ArrayList(); + logTypes.add("syslog"); + // create container logs in localLogDir + createContainerLogInLocalDir(appLogsDir, containerId1, fs, logTypes); + createContainerLogInLocalDir(appLogsDir, containerId2, fs, logTypes); + + // create two logs for container3 in localLogDir + logTypes.add("stdout"); + createContainerLogInLocalDir(appLogsDir, containerId3, fs, logTypes); + + Path path = new Path(remoteLogRootDir + ugi.getShortUserName() + + "/logs/application_0_0001"); + if (fs.exists(path)) { + fs.delete(path, true); + } + assertTrue(fs.mkdirs(path)); + + // upload container logs into remote directory + // the first two logs is empty. When we try to read first two logs, + // we will meet EOF exception, but it will not impact other logs. + // Other logs should be read successfully. + uploadContainerLogIntoRemoteDir(ugi, configuration, rootLogDirs, nodeId, + containerId1, path, fs); + uploadContainerLogIntoRemoteDir(ugi, configuration, rootLogDirs, nodeId, + containerId2, path, fs); + uploadContainerLogIntoRemoteDir(ugi, configuration, rootLogDirs, nodeId2, + containerId3, path, fs); + + YarnClient mockYarnClient = createMockYarnClient( + YarnApplicationState.FINISHED, ugi.getShortUserName()); + LogsCLI cli = new LogsCLIForTest(mockYarnClient); + cli.setConf(configuration); + + int exitCode = cli.run(new String[] { "-applicationId", appId.toString(), + "-nodeAddress", nodeId.toString()}); + assertTrue(exitCode == 0); + assertTrue(sysOutStream.toString().contains( + "Hello container_0_0001_01_000001 in syslog!")); + assertTrue(sysOutStream.toString().contains( + "Hello container_0_0001_01_000002 in syslog!")); + assertTrue(!sysOutStream.toString().contains( + "Hello container_0_0001_01_000003 in syslog!")); + sysOutStream.reset(); + + exitCode = cli.run(new String[] { "-applicationId", appId.toString(), + "-nodeAddress", nodeId2.toString()}); + assertTrue(exitCode == 0); + assertTrue(!sysOutStream.toString().contains( + "Hello container_0_0001_01_000001 in syslog!")); + assertTrue(sysOutStream.toString().contains( + "Hello container_0_0001_01_000003 in syslog!")); + assertTrue(sysOutStream.toString().contains( + "Hello container_0_0001_01_000003 in stdout!")); + sysOutStream.reset(); + } finally { + fs.delete(new Path(remoteLogRootDir), true); + fs.delete(new Path(rootLogDir), true); + } + } + + @Test (timeout = 5000) public void testFetchRunningApplicationLogs() throws Exception { UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java index ef56504..a9766b5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java @@ -257,6 +257,80 @@ public int dumpAContainerLogsForLogTypeWithoutNodeId( } @Private + public int dumpAllContainerLogsForASpecificNodeId( + ContainerLogsRequest options) throws IOException { + ApplicationId appId = options.getAppId(); + String appOwner = options.getAppOwner(); + String localDir = options.getOutputLocalDir(); + List logTypes = options.getLogTypes(); + String nodeId = options.getNodeId(); + RemoteIterator nodeFiles = getRemoteNodeFileDir( + appId, appOwner); + if (nodeFiles == null) { + return -1; + } + boolean foundAnyLogs = false; + while (nodeFiles.hasNext()) { + FileStatus thisNodeFile = nodeFiles.next(); + String fileName = thisNodeFile.getPath().getName(); + if (fileName.contains(LogAggregationUtils.getNodeString(nodeId)) && + !fileName.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { + AggregatedLogFormat.LogReader reader = + new AggregatedLogFormat.LogReader(getConf(), + thisNodeFile.getPath()); + try { + DataInputStream valueStream; + LogKey key = new LogKey(); + valueStream = reader.next(key); + + while (valueStream != null) { + PrintStream out = createPrintStream(localDir, + thisNodeFile.getPath().getName(), key.toString()); + try { + String containerString = String.format( + CONTAINER_ON_NODE_PATTERN, key, + thisNodeFile.getPath().getName()); + out.println(containerString); + out.println(StringUtils.repeat("=", containerString.length())); + while (true) { + try { + if (logTypes == null || logTypes.isEmpty()) { + LogReader.readAContainerLogsForALogType(valueStream, out, + thisNodeFile.getModificationTime(), + options.getBytes()); + foundAnyLogs = true; + } else { + int result = LogReader.readContainerLogsForALogType( + valueStream, out, thisNodeFile.getModificationTime(), + logTypes, options.getBytes()); + if (result == 0) { + foundAnyLogs = true; + } + } + } catch (EOFException eof) { + break; + } + } + } finally { + closePrintStream(out); + } + // Next container + key = new LogKey(); + valueStream = reader.next(key); + } + } finally { + reader.close(); + } + } + } + if (!foundAnyLogs) { + emptyLogDir(getRemoteAppLogDir(appId, appOwner).toString()); + return -1; + } + return 0; + } + + @Private public int dumpAContainerLogs(String containerIdStr, AggregatedLogFormat.LogReader reader, PrintStream out, long logUploadedTime, long bytes) throws IOException {