diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java index c0d8795..ade5e89 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.io.PrintStream; import java.util.ArrayList; import java.util.Arrays; @@ -41,6 +42,7 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -465,13 +467,7 @@ public int printContainerLogsFromRunningApplication(Configuration conf, try { // fetch all the log files for the container // filter the log files based on the given -log_files pattern - List allLogFileInfos= - getContainerLogFiles(getConf(), containerIdStr, nodeHttpAddress); - List fileNames = new ArrayList(); - for (PerLogFileInfo fileInfo : allLogFileInfos) { - fileNames.add(fileInfo.getFileName()); - } - Set matchedFiles = getMatchedLogFiles(request, fileNames, + Set matchedFiles = getMatchedContainerLogFiles(request, useRegex); if (matchedFiles.isEmpty()) { System.err.println("Can not find any log file matching the pattern: " @@ -488,22 +484,33 @@ public int printContainerLogsFromRunningApplication(Configuration conf, out.println(containerString); out.println(StringUtils.repeat("=", containerString.length())); boolean foundAnyLogs = false; + byte[] buffer = new byte[65536]; for (String logFile : newOptions.getLogTypes()) { out.println("LogType:" + logFile); out.println("Log Upload Time:" + Times.format(System.currentTimeMillis())); out.println("Log Contents:"); + InputStream is = null; try { - WebResource webResource = - webServiceClient.resource(WebAppUtils.getHttpSchemePrefix(conf) - + nodeHttpAddress); - ClientResponse response = - webResource.path("ws").path("v1").path("node") - .path("containers").path(containerIdStr).path("logs") - .path(logFile) - .queryParam("size", Long.toString(request.getBytes())) - .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); - out.println(response.getEntity(String.class)); + ClientResponse response = getResponeFromNMWebService(conf, + webServiceClient, request, logFile); + if (response != null && response.getClientResponseStatus().equals( + ClientResponse.Status.OK)) { + is = response.getEntityInputStream(); + int len = 0; + while((len = is.read(buffer)) != -1) { + out.write(buffer, 0, len); + } + out.println(); + } else { + out.println("Can not get any logs for the log file: " + logFile); + String msg = "Response from the NodeManager:" + nodeId + + " WebService is " + ((response == null) ? "null": + "not successful," + " HTTP error code: " + + response.getStatus() + ", Server response:\n" + + response.getEntity(String.class)); + out.println(msg); + } StringBuilder sb = new StringBuilder(); sb.append("End of LogType:" + logFile + "."); if (request.getContainerState() == ContainerState.RUNNING) { @@ -518,6 +525,8 @@ public int printContainerLogsFromRunningApplication(Configuration conf, System.err.println("Can not find the log file:" + logFile + " for the container:" + containerIdStr + " in NodeManager:" + nodeId); + } finally { + IOUtils.closeQuietly(is); } } // for the case, we have already uploaded partial logs in HDFS @@ -1190,4 +1199,33 @@ public void setFileLength(String fileLength) { this.fileLength = fileLength; } } + + @VisibleForTesting + public Set getMatchedContainerLogFiles(ContainerLogsRequest request, + boolean useRegex) throws IOException { + // fetch all the log files for the container + // filter the log files based on the given -log_files pattern + List allLogFileInfos= + getContainerLogFiles(getConf(), request.getContainerId(), + request.getNodeHttpAddress()); + List fileNames = new ArrayList(); + for (PerLogFileInfo fileInfo : allLogFileInfos) { + fileNames.add(fileInfo.getFileName()); + } + return getMatchedLogFiles(request, fileNames, + useRegex); + } + + @VisibleForTesting + public ClientResponse getResponeFromNMWebService(Configuration conf, + Client webServiceClient, ContainerLogsRequest request, String logFile) { + WebResource webResource = + webServiceClient.resource(WebAppUtils.getHttpSchemePrefix(conf) + + request.getNodeHttpAddress()); + return webResource.path("ws").path("v1").path("node") + .path("containers").path(request.getContainerId()).path("logs") + .path(logFile) + .queryParam("size", Long.toString(request.getBytes())) + .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java index 19ddb2f..ec8e000 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -30,10 +31,13 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.File; +import java.io.FileInputStream; import java.io.FileWriter; import java.io.IOException; import java.io.InputStreamReader; @@ -63,6 +67,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; @@ -582,6 +587,96 @@ public ContainerReport getContainerReport(String containerIdStr) } @Test (timeout = 5000) + public void testGetRunningContainerLogs() throws Exception { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + + NodeId nodeId = NodeId.newInstance("localhost", 1234); + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = ApplicationAttemptId + .newInstance(appId, 1); + + // Create a mock ApplicationAttempt Report + ApplicationAttemptReport mockAttemptReport = mock( + ApplicationAttemptReport.class); + doReturn(appAttemptId).when(mockAttemptReport).getApplicationAttemptId(); + List attemptReports = Arrays.asList( + mockAttemptReport); + + // Create one mock containerReport + ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1); + ContainerReport mockContainerReport1 = mock(ContainerReport.class); + doReturn(containerId1).when(mockContainerReport1).getContainerId(); + doReturn(nodeId).when(mockContainerReport1).getAssignedNode(); + doReturn("http://localhost:2345").when(mockContainerReport1) + .getNodeHttpAddress(); + doReturn(ContainerState.RUNNING).when(mockContainerReport1) + .getContainerState(); + List containerReports = Arrays.asList( + mockContainerReport1); + + // Mock the YarnClient, and it would report the previous created + // mockAttemptReport and previous two created mockContainerReports + YarnClient mockYarnClient = createMockYarnClient( + YarnApplicationState.RUNNING, ugi.getShortUserName(), true, + attemptReports, containerReports); + doReturn(mockContainerReport1).when(mockYarnClient).getContainerReport( + any(ContainerId.class)); + + // create local logs + Configuration configuration = new Configuration(); + FileSystem fs = FileSystem.get(configuration); + String rootLogDir = "target/LocalLogs"; + Path rootLogDirPath = new Path(rootLogDir); + if (fs.exists(rootLogDirPath)) { + fs.delete(rootLogDirPath, true); + } + assertTrue(fs.mkdirs(rootLogDirPath)); + + Path appLogsDir = new Path(rootLogDirPath, appId.toString()); + if (fs.exists(appLogsDir)) { + fs.delete(appLogsDir, true); + } + assertTrue(fs.mkdirs(appLogsDir)); + + String fileName = "syslog"; + List logTypes = new ArrayList(); + logTypes.add(fileName); + // create container logs in localLogDir + createContainerLogInLocalDir(appLogsDir, containerId1, fs, logTypes); + + Path containerDirPath = new Path(appLogsDir, containerId1.toString()); + Path logPath = new Path(containerDirPath, fileName); + File logFile = new File(logPath.toString()); + final FileInputStream fis = new FileInputStream(logFile); + + try { + LogsCLI cli = spy(new LogsCLIForTest(mockYarnClient)); + Set logsSet = new HashSet(); + logsSet.add(fileName); + doReturn(logsSet).when(cli).getMatchedContainerLogFiles( + any(ContainerLogsRequest.class), anyBoolean()); + ClientResponse mockReponse = mock(ClientResponse.class); + doReturn(ClientResponse.Status.OK).when(mockReponse) + .getClientResponseStatus(); + doReturn(fis).when(mockReponse).getEntityInputStream(); + doReturn(mockReponse).when(cli).getResponeFromNMWebService( + any(Configuration.class), + any(Client.class), + any(ContainerLogsRequest.class), anyString()); + cli.setConf(new YarnConfiguration()); + int exitCode = cli.run(new String[] {"-containerId", + containerId1.toString()}); + assertTrue(exitCode == 0); + assertTrue(sysOutStream.toString().contains( + logMessage(containerId1, "syslog"))); + sysOutStream.reset(); + } finally { + IOUtils.closeQuietly(fis); + fs.delete(new Path(rootLogDir), true); + } + } + + @Test (timeout = 5000) public void testFetchRunningApplicationLogs() throws Exception { UserGroupInformation ugi = UserGroupInformation.getCurrentUser();