diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/JAXBContextResolver.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/JAXBContextResolver.java index 46d3928..b0c662b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/JAXBContextResolver.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/JAXBContextResolver.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.AppsInfo; import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainersInfo; +import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.LogInfo; import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NodeInfo; import org.apache.hadoop.yarn.webapp.RemoteExceptionData; @@ -47,7 +48,7 @@ // you have to specify all the dao classes here private final Class[] cTypes = {AppInfo.class, AppsInfo.class, ContainerInfo.class, ContainersInfo.class, NodeInfo.class, - RemoteExceptionData.class}; + RemoteExceptionData.class, LogInfo.class}; public JAXBContextResolver() throws Exception { this.types = new HashSet(Arrays.asList(cTypes)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java index 02b2ab0..688adb2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java @@ -21,10 +21,14 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.OutputStream; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; +import java.nio.channels.WritableByteChannel; import java.util.Map.Entry; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.PathParam; @@ -53,6 +57,7 @@ import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.AppsInfo; import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainersInfo; +import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.LogInfo; import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NodeInfo; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.webapp.BadRequestException; @@ -71,6 +76,7 @@ private WebApp webapp; private static RecordFactory recordFactory = RecordFactoryProvider .getRecordFactory(null); + private static final Long DEFAULT_PAGE_SIZE = 1024*1024L; //1MB private @javax.ws.rs.core.Context HttpServletRequest request; @@ -195,6 +201,33 @@ public ContainerInfo getNodeContainer(@javax.ws.rs.core.Context } + @GET + @Path("/containerlogs-info/{containerid}/{filename}") + @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public LogInfo getLogsInfo(@PathParam("containerid") String containerIdStr, + @PathParam("filename") String filename, @QueryParam("pagesize") Long pageSize) { + ContainerId containerId; + try { + containerId = ContainerId.fromString(containerIdStr); + } catch (IllegalArgumentException ex) { + throw new BadRequestException("invalid container id, " + containerIdStr); + } + File logFile = null; + try { + logFile = ContainerLogsUtils.getContainerLogFile(containerId, filename, + request.getRemoteUser(), nmContext); + } catch (YarnException ex) { + throw new RuntimeException( + "Failed to get logs of container " + containerIdStr, ex); + } + long totalSize = logFile.length(); + if(pageSize == null){ + pageSize = DEFAULT_PAGE_SIZE; + } + long pageCount = (long) Math.ceil((float) totalSize / pageSize); + return new LogInfo(totalSize, pageSize, pageCount); + } + /** * Returns the contents of a container's log file in plain text. * @@ -206,6 +239,11 @@ public ContainerInfo getNodeContainer(@javax.ws.rs.core.Context * The container ID * @param filename * The name of the log file + * @param pageSize + * The max bytes of one page , default 1MB + * @param pageIndex + * The index of required page, default 0, can be nagative + * (set -1 will get the last page content) * @return * The contents of the container's log file */ @@ -215,7 +253,9 @@ public ContainerInfo getNodeContainer(@javax.ws.rs.core.Context @Public @Unstable public Response getLogs(@PathParam("containerid") String containerIdStr, - @PathParam("filename") String filename) { + @PathParam("filename") String filename, + @QueryParam("pagesize") Long pageSize, + @DefaultValue("0") @QueryParam("pageindex") Long pageIndex) { ContainerId containerId; try { containerId = ContainerId.fromString(containerIdStr); @@ -232,24 +272,51 @@ public Response getLogs(@PathParam("containerid") String containerIdStr, } catch (YarnException ex) { return Response.serverError().entity(ex.getMessage()).build(); } - + + long totalSize = logFile.length(); + if (pageSize == null) { + pageSize = DEFAULT_PAGE_SIZE; + } + long pageCount = (long) Math.ceil((float) totalSize / pageSize); + if (pageIndex < 0) { + pageIndex += pageCount; + } + if (pageIndex < 0 || pageIndex >= pageCount) { + return Response.status(Status.NOT_FOUND) + .entity("Page index out of range! Total page count is " + pageCount) + .build(); + } + try { final FileInputStream fis = ContainerLogsUtils.openLogFileForRead( containerIdStr, logFile, nmContext); + final FileChannel inputChannel = fis.getChannel(); + final long startIndex = pageIndex * pageSize; + final long toRead = + pageIndex < pageCount - 1 ? pageSize : totalSize - startIndex; StreamingOutput stream = new StreamingOutput() { @Override public void write(OutputStream os) throws IOException, WebApplicationException { + WritableByteChannel outputChannel = Channels.newChannel(os); try { int bufferSize = 65536; - byte[] buf = new byte[bufferSize]; - int len; - while ((len = fis.read(buf, 0, bufferSize)) > 0) { - os.write(buf, 0, len); + long position = startIndex; + long leftToRead = toRead; + int currentToRead = + leftToRead > bufferSize ? bufferSize : (int) leftToRead; + long transferedLen; + while (currentToRead > 0 && (transferedLen = inputChannel.transferTo(position, currentToRead, outputChannel)) + > 0) { + leftToRead -= transferedLen; + position += transferedLen; + currentToRead = + leftToRead > bufferSize ? bufferSize : (int) leftToRead; } os.flush(); - } finally { + }finally{ + IOUtils.closeQuietly(outputChannel); IOUtils.closeQuietly(fis); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/LogInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/LogInfo.java new file mode 100644 index 0000000..f234f23 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/LogInfo.java @@ -0,0 +1,37 @@ +package org.apache.hadoop.yarn.server.nodemanager.webapp.dao; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; + +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class LogInfo { + + protected long totalSize; + + protected long pageSize; + + protected long pageCount; + + public LogInfo() { + } // JAXB needs this + + public LogInfo(long totalSize, long pageSize, long pageCount) { + this.totalSize = totalSize; + this.pageSize = pageSize; + this.pageCount = pageCount; + } + + public long getTotalSize() { + return totalSize; + } + + public long getPageSize() { + return pageSize; + } + + public long getPageCount() { + return pageCount; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java index 1f5590c..e56db1b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java @@ -370,6 +370,42 @@ public void testContainerLogs() throws IOException { .get(ClientResponse.class); responseText = response.getEntity(String.class); assertEquals(logMessage, responseText); + + // ask for log info of container and test pagination + long pageSize = 5; + long pageCount = (long) Math.ceil((float) logFile.length() / pageSize); + response = r.path("ws").path("v1").path("node").path("containerlogs-info") + .path(containerIdStr).path(filename) + .queryParam("pagesize", String.valueOf(pageSize)) + .accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + assertEquals("incorrect number of elements", 1, json.length()); + try { + JSONObject logInfo = json.getJSONObject("logInfo"); + assertEquals(logFile.length(), logInfo.getLong("totalSize")); + assertEquals(pageSize, logInfo.getLong("pageSize")); + assertEquals(pageCount, logInfo.getLong("pageCount")); + } catch (JSONException e) { + Assert.fail("incorrect structure of elemenets for containers-info api"); + } + + // ask for log content of container and test pagination + response = r.path("ws").path("v1").path("node").path("containerlogs") + .path(containerIdStr).path(filename) + .queryParam("pagesize", String.valueOf(pageSize)) + .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); + responseText = response.getEntity(String.class); + assertEquals(logMessage.substring(0, (int) pageSize), responseText); + + response = r.path("ws").path("v1").path("node").path("containerlogs") + .path(containerIdStr).path(filename) + .queryParam("pagesize", String.valueOf(pageSize)) + .queryParam("pageindex", "1").accept(MediaType.TEXT_PLAIN) + .get(ClientResponse.class); + responseText = response.getEntity(String.class); + assertEquals(logMessage.substring((int) pageSize, (int) pageSize * 2), + responseText); } public void verifyNodesXML(NodeList nodes) throws JSONException, Exception {