From 8491843d472e8f650d315a9af4ca874648dc3da5 Mon Sep 17 00:00:00 2001 From: Adam Antal Date: Wed, 11 Sep 2019 17:38:27 +0200 Subject: [PATCH] YARN-9808. Zero length files in container log output haven't got a header --- .../hadoop/yarn/client/cli/TestLogsCLI.java | 2 + .../yarn/logaggregation/LogToolUtils.java | 61 +++++++++---------- .../TestAggregatedLogFormat.java | 39 ++++++++++++ ...stLogAggregationIndexedFileController.java | 29 ++++++++- .../TestLogAggregationService.java | 2 + .../nodemanager/webapp/TestNMWebServices.java | 39 +++++++----- 6 files changed, 124 insertions(+), 48 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java index 7a229dc992a..7cc92c5e765 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java @@ -362,6 +362,8 @@ public void testHelpMessage() throws Exception { Assert.assertTrue(sysOutStream.toString().contains(appReportStr)); } + // TODO add test with empty container log + @Test (timeout = 15000) public void testFetchFinishedApplictionLogs() throws Exception { String remoteLogRootDir = "target/logs/"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java index c242d898482..e9bf6fc26f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java @@ -84,22 +84,20 @@ public static void outputContainerLog(String containerId, String nodeId, : (int) pendingRead; int len = fis.read(buf, 0, toRead); boolean keepGoing = (len != -1 && curRead < totalBytesToRead); - if (keepGoing) { - StringBuilder sb = new StringBuilder(); - String containerStr = String.format( - LogToolUtils.CONTAINER_ON_NODE_PATTERN, - containerId, nodeId); - sb.append(containerStr + "\n") - .append("LogAggregationType: " + logType + "\n") - .append(StringUtils.repeat("=", containerStr.length()) + "\n") - .append("LogType:" + fileName + "\n") - .append("LogLastModifiedTime:" + lastModifiedTime + "\n") - .append("LogLength:" + Long.toString(fileLength) + "\n") - .append("LogContents:\n"); - byte[] b = sb.toString().getBytes( - Charset.forName("UTF-8")); - os.write(b, 0, b.length); - } + StringBuilder sb = new StringBuilder(); + String containerStr = String.format( + LogToolUtils.CONTAINER_ON_NODE_PATTERN, + containerId, nodeId); + sb.append(containerStr + "\n") + .append("LogAggregationType: " + logType + "\n") + .append(StringUtils.repeat("=", containerStr.length()) + "\n") + .append("LogType:" + fileName + "\n") + .append("LogLastModifiedTime:" + lastModifiedTime + "\n") + .append("LogLength:" + Long.toString(fileLength) + "\n") + .append("LogContents:\n"); + byte[] b = sb.toString().getBytes( + Charset.forName("UTF-8")); + os.write(b, 0, b.length); while (keepGoing) { os.write(buf, 0, len); curRead += len; @@ -132,22 +130,23 @@ public static void outputContainerLogThroughZeroCopy(String containerId, } } + // output log summary + StringBuilder sb = new StringBuilder(); + String containerStr = String.format( + LogToolUtils.CONTAINER_ON_NODE_PATTERN, + containerId, nodeId); + sb.append(containerStr + "\n") + .append("LogAggregationType: " + logType + "\n") + .append(StringUtils.repeat("=", containerStr.length()) + "\n") + .append("LogType:" + fileName + "\n") + .append("LogLastModifiedTime:" + lastModifiedTime + "\n") + .append("LogLength:" + Long.toString(fileLength) + "\n") + .append("LogContents:\n"); + byte[] b = sb.toString().getBytes( + Charset.forName("UTF-8")); + os.write(b, 0, b.length); + if (totalBytesToRead > 0) { - // output log summary - StringBuilder sb = new StringBuilder(); - String containerStr = String.format( - LogToolUtils.CONTAINER_ON_NODE_PATTERN, - containerId, nodeId); - sb.append(containerStr + "\n") - .append("LogAggregationType: " + logType + "\n") - .append(StringUtils.repeat("=", containerStr.length()) + "\n") - .append("LogType:" + fileName + "\n") - .append("LogLastModifiedTime:" + lastModifiedTime + "\n") - .append("LogLength:" + Long.toString(fileLength) + "\n") - .append("LogContents:\n"); - byte[] b = sb.toString().getBytes( - Charset.forName("UTF-8")); - os.write(b, 0, b.length); // output log content FileChannel inputChannel = fis.getChannel(); WritableByteChannel outputChannel = Channels.newChannel(os); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java index 6c26c402902..91239189144 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java @@ -282,6 +282,45 @@ private void testReadAcontainerLog(boolean logUploadedTime) throws Exception { Assert.assertEquals(expectedLength, s.length()); } + @Test + public void testZeroLengthLog() throws IOException { + Configuration conf = new Configuration(); + File workDir = new File(testWorkDir, "testZeroLength"); + Path remoteAppLogFile = new Path(workDir.getAbsolutePath(), + "aggregatedLogFile"); + Path srcFileRoot = new Path(workDir.getAbsolutePath(), "srcFiles"); + ContainerId testContainerId = TestContainerId.newContainerId(1, 1, 1, 1); + Path t = new Path(srcFileRoot, testContainerId.getApplicationAttemptId() + .getApplicationId().toString()); + Path srcFilePath = new Path(t, testContainerId.toString()); + + // Create zero byte file + writeSrcFile(srcFilePath, "stdout", 0); + + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + try (LogWriter logWriter = new LogWriter()) { + logWriter.initialize(conf, remoteAppLogFile, ugi); + + LogKey logKey = new LogKey(testContainerId); + LogValue logValue = + new LogValue(Collections.singletonList(srcFileRoot.toString()), + testContainerId, ugi.getShortUserName()); + + logWriter.append(logKey, logValue); + } + + LogReader logReader = new LogReader(conf, remoteAppLogFile); + LogKey rLogKey = new LogKey(); + DataInputStream dis = logReader.next(rLogKey); + Writer writer = new StringWriter(); + LogReader.readAcontainerLogs(dis, writer); + + Assert.assertEquals("LogType:stdout\n" + + "LogLength:0\n" + + "Log Contents:\n\n" + + "End of LogType:stdout\n\n", writer.toString()); + } + @Test(timeout=10000) public void testContainerLogsFileAccess() throws IOException { // This test will run only if NativeIO is enabled as SecureIOUtils diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexedFileController.java index e63e469d982..5e11fb9d70c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexedFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexedFileController.java @@ -85,6 +85,7 @@ .createImmutable((short) (0777)); private static final UserGroupInformation USER_UGI = UserGroupInformation .createRemoteUser("testUser"); + private static final String ZERO_FILE = "zero"; private FileSystem fs; private ApplicationId appId; private ContainerId containerId; @@ -153,6 +154,8 @@ public void testLogAggregationIndexFileFormat() throws Exception { logType); files.add(file); } + files.add(createZeroLocalLogFile(appLogsDir)); + LogValue value = mock(LogValue.class); when(value.getPendingLogFilesToUploadForThisContainer()).thenReturn(files); @@ -212,12 +215,13 @@ public boolean isRollover(final FileContext fc, for (ContainerLogMeta log : meta) { assertEquals(containerId.toString(), log.getContainerId()); assertEquals(nodeId.toString(), log.getNodeId()); - assertEquals(3, log.getContainerLogMeta().size()); + assertEquals(4, log.getContainerLogMeta().size()); for (ContainerLogFileInfo file : log.getContainerLogMeta()) { fileNames.add(file.getFileName()); } } fileNames.removeAll(logTypes); + fileNames.remove(ZERO_FILE); assertTrue(fileNames.isEmpty()); boolean foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out); @@ -226,6 +230,7 @@ public boolean isRollover(final FileContext fc, assertTrue(sysOutStream.toString().contains(logMessage( containerId, logType))); } + assertZeroFileIsContained(sysOutStream.toString()); sysOutStream.reset(); Configuration factoryConf = new Configuration(getConf()); @@ -297,12 +302,13 @@ public boolean isRollover(final FileContext fc, for (ContainerLogMeta log : meta) { assertEquals(containerId.toString(), log.getContainerId()); assertEquals(nodeId.toString(), log.getNodeId()); - assertEquals(3, log.getContainerLogMeta().size()); + assertEquals(4, log.getContainerLogMeta().size()); for (ContainerLogFileInfo file : log.getContainerLogMeta()) { fileNames.add(file.getFileName()); } } fileNames.removeAll(logTypes); + fileNames.remove(ZERO_FILE); assertTrue(fileNames.isEmpty()); foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out); assertTrue(foundLogs); @@ -333,6 +339,7 @@ public boolean isRollover(final FileContext fc, } } fileNames.removeAll(newLogTypes); + fileNames.remove(ZERO_FILE); assertTrue(fileNames.isEmpty()); foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out); assertTrue(foundLogs); @@ -361,6 +368,7 @@ public boolean isRollover(final FileContext fc, } } fileNames.removeAll(newLogTypes); + fileNames.remove(ZERO_FILE); assertTrue(fileNames.isEmpty()); foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out); assertTrue(foundLogs); @@ -423,8 +431,23 @@ public void testFetchApplictionLogsHar() throws Exception { sysOutStream.reset(); } + private void assertZeroFileIsContained(String outStream) { + assertTrue(outStream.contains( + "LogContents:\n" + + "\n" + + "End of LogType:zero")); + } + + private File createZeroLocalLogFile(Path localLogDir) throws IOException { + return createAndWriteLocalLogFile(localLogDir, ZERO_FILE, ""); + } + private File createAndWriteLocalLogFile(ContainerId containerId, Path localLogDir, String logType) throws IOException { + return createAndWriteLocalLogFile(localLogDir, logType, logMessage(containerId, logType)); + } + + private File createAndWriteLocalLogFile(Path localLogDir, String logType, String message) throws IOException { File file = new File(localLogDir.toString(), logType); if (file.exists()) { file.delete(); @@ -433,7 +456,7 @@ private File createAndWriteLocalLogFile(ContainerId containerId, Writer writer = null; try { writer = new FileWriter(file); - writer.write(logMessage(containerId, logType)); + writer.write(message); writer.close(); return file; } finally { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 063215ee805..256a2a50b58 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -150,6 +150,8 @@ import com.google.common.base.Supplier; import org.slf4j.LoggerFactory; +// TODO add tests here? + public class TestLogAggregationService extends BaseContainerManagerTest { private Map acls = createAppAcls(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java index 740af8f9748..43581a301e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java @@ -28,6 +28,7 @@ import com.sun.jersey.api.client.WebResource; import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; import com.sun.jersey.test.framework.WebAppDescriptor; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -118,6 +119,7 @@ private static LocalDirsHandlerService dirsHandler; private static WebApp nmWebApp; private static final String LOGSERVICEWSADDR = "test:1234"; + private static final String LOG_MESSAGE = "log message\n"; private static final File testRootDir = new File("target", TestNMWebServices.class.getSimpleName()); @@ -441,20 +443,26 @@ public void testSingleNodesXML() throws JSONException, Exception { @Test (timeout = 5000) public void testContainerLogsWithNewAPI() throws Exception { - final ContainerId containerId = BuilderUtils.newContainerId(0, 0, 0, 0); - WebResource r = resource(); - r = r.path("ws").path("v1").path("node").path("containers") - .path(containerId.toString()).path("logs"); - testContainerLogs(r, containerId); + ContainerId containerId0 = BuilderUtils.newContainerId(0, 0, 0, 0); + WebResource r0 = resource(); + r0 = r0.path("ws").path("v1").path("node").path("containers") + .path(containerId0.toString()).path("logs"); + testContainerLogs(r0, containerId0, LOG_MESSAGE); + + ContainerId containerId1 = BuilderUtils.newContainerId(0, 0, 0, 1); + WebResource r1 = resource(); + r1 = r1.path("ws").path("v1").path("node").path("containers") + .path(containerId1.toString()).path("logs"); + testContainerLogs(r1, containerId1, ""); } @Test (timeout = 5000) public void testContainerLogsWithOldAPI() throws Exception { - final ContainerId containerId = BuilderUtils.newContainerId(1, 1, 0, 1); + final ContainerId containerId2 = BuilderUtils.newContainerId(1, 1, 0, 2); WebResource r = resource(); r = r.path("ws").path("v1").path("node").path("containerlogs") - .path(containerId.toString()); - testContainerLogs(r, containerId); + .path(containerId2.toString()); + testContainerLogs(r, containerId2, LOG_MESSAGE); } @Test (timeout = 10000) @@ -583,7 +591,7 @@ public void testGetYarnGpuResourceInfo() 2, json.getJSONArray("assignedGpuDevices").length()); } - private void testContainerLogs(WebResource r, ContainerId containerId) + private void testContainerLogs(WebResource r, ContainerId containerId, String logMessage) throws Exception { final String containerIdStr = containerId.toString(); final ApplicationAttemptId appAttemptId = containerId @@ -591,7 +599,6 @@ private void testContainerLogs(WebResource r, ContainerId containerId) final ApplicationId appId = appAttemptId.getApplicationId(); final String appIdStr = appId.toString(); final String filename = "logfile1"; - final String logMessage = "log message\n"; nmContext.getApplications().put(appId, new ApplicationImpl(null, "user", appId, null, nmContext)); @@ -607,6 +614,9 @@ private void testContainerLogs(WebResource r, ContainerId containerId) File logFile = new File(path.toUri().getPath()); logFile.deleteOnExit(); + if (logFile.getParentFile().exists()) { + FileUtils.deleteDirectory(logFile.getParentFile()); + } assertTrue("Failed to create log dir", logFile.getParentFile().mkdirs()); PrintWriter pw = new PrintWriter(logFile); pw.print(logMessage); @@ -628,8 +638,9 @@ private void testContainerLogs(WebResource r, ContainerId containerId) .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); responseText = response.getEntity(String.class); responseLogMessage = getLogContext(responseText); - assertEquals(5, responseLogMessage.getBytes().length); - assertEquals(new String(logMessage.getBytes(), 0, 5), responseLogMessage); + int truncatedLength = Math.min(5, logMessage.getBytes().length); + assertEquals(truncatedLength, responseLogMessage.getBytes().length); + assertEquals(new String(logMessage.getBytes(), 0, truncatedLength), responseLogMessage); assertTrue(fullTextSize >= responseLogMessage.getBytes().length); // specify the bytes which is larger than the actual file size, @@ -649,9 +660,9 @@ private void testContainerLogs(WebResource r, ContainerId containerId) .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); responseText = response.getEntity(String.class); responseLogMessage = getLogContext(responseText); - assertEquals(5, responseLogMessage.getBytes().length); + assertEquals(truncatedLength, responseLogMessage.getBytes().length); assertEquals(new String(logMessage.getBytes(), - logMessage.getBytes().length - 5, 5), responseLogMessage); + logMessage.getBytes().length - truncatedLength, truncatedLength), responseLogMessage); assertTrue(fullTextSize >= responseLogMessage.getBytes().length); response = r.path(filename) -- 2.21.0