From 543264a792580a69384b64e68a92e989915838c5 Mon Sep 17 00:00:00 2001 From: Rohith Sharma K S Date: Thu, 20 Oct 2016 13:38:57 +0530 Subject: [PATCH] YARN-5742 --- .../reader/TimelineReaderServer.java | 7 +- .../reader/TimelineReaderWebServices.java | 1 + .../reader/TimelineReaderYarnUtilService.java | 417 +++++++++++++++++++++ 3 files changed, 424 insertions(+), 1 deletion(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderYarnUtilService.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java index 110d1dc..1bda6a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java @@ -57,6 +57,8 @@ private static final int SHUTDOWN_HOOK_PRIORITY = 30; static final String TIMELINE_READER_MANAGER_ATTR = "timeline.reader.manager"; + static final String TIMELINE_READER_MANAGER_CONFIG = + "timeline.reader.manager.config"; private HttpServer2 readerWebServer; private TimelineReaderManager timelineReaderManager; @@ -112,6 +114,7 @@ private void startTimelineReaderWebApp() { YarnConfiguration.TIMELINE_SERVICE_BIND_HOST, WebAppUtils.getTimelineReaderWebAppURL(conf)); LOG.info("Instantiating TimelineReaderWebApp at " + bindAddress); + System.out.println(conf.get("fs.defaultFS")); try { HttpServer2.Builder builder = new HttpServer2.Builder() .setName("timeline") @@ -128,13 +131,15 @@ private void startTimelineReaderWebApp() { StaticUserWebFilter.StaticUserFilter.class.getName(), options, new String[] {"/*"}); - readerWebServer.addJerseyResourcePackage( + readerWebServer + .addJerseyResourcePackage( TimelineReaderWebServices.class.getPackage().getName() + ";" + GenericExceptionHandler.class.getPackage().getName() + ";" + YarnJacksonJaxbJsonProvider.class.getPackage().getName(), "/*"); readerWebServer.setAttribute(TIMELINE_READER_MANAGER_ATTR, timelineReaderManager); + readerWebServer.setAttribute(TIMELINE_READER_MANAGER_CONFIG, conf); readerWebServer.start(); } catch (Exception e) { String msg = "TimelineReaderWebApp failed to start."; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java index db0c4e1..16bc4c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java @@ -44,6 +44,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderYarnUtilService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderYarnUtilService.java new file mode 100644 index 0000000..679d5da --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderYarnUtilService.java @@ -0,0 +1,417 @@ +package org.apache.hadoop.yarn.server.timelineservice.reader; + +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; + +import javax.servlet.ServletContext; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.ResponseBuilder; +import javax.ws.rs.core.Response.Status; +import javax.ws.rs.core.StreamingOutput; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; +import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.util.Times; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.apache.hadoop.yarn.webapp.BadRequestException; +import org.apache.hadoop.yarn.webapp.NotFoundException; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; + +import com.google.common.base.Joiner; +import com.google.inject.Inject; +import com.google.inject.Singleton; + +@Singleton +@Path("/ws/v2/yarnutil") +public class TimelineReaderYarnUtilService { + + private static final Log LOG = + LogFactory.getLog(TimelineReaderYarnUtilService.class); + @Context + private ServletContext ctxt; + private static final String NM_DOWNLOAD_URI_STR = "/ws/v1/node/containers"; + private static final Joiner JOINER = Joiner.on(""); + private static final Joiner DOT_JOINER = Joiner.on(". "); + + protected void init(HttpServletResponse response) { + // clear content type + response.setContentType(null); + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + public TimelineAbout about(@Context HttpServletRequest req, + @Context HttpServletResponse res) { + init(res); + return TimelineUtils.createTimelineAbout("Timeline Reader API"); + } + + @GET + @Path("/containerlogs/{containerid}/{filename}") + @Produces({ MediaType.TEXT_PLAIN }) + @Public + @Unstable + public Response getLogs(@Context HttpServletRequest req, + @Context HttpServletResponse res, + @PathParam("containerid") String containerIdStr, + @PathParam("filename") String filename, + @QueryParam("format") String format, @QueryParam("size") String size) { + init(res); + ContainerId containerId; + try { + containerId = ContainerId.fromString(containerIdStr); + } catch (IllegalArgumentException ex) { + return createBadResponse(Status.NOT_FOUND, + "Invalid ContainerId: " + containerIdStr); + } + + final long length = parseLongParam(size); + + ApplicationId appId = + containerId.getApplicationAttemptId().getApplicationId(); + TimelineEntity appEntity = getApplicationEntity(appId.toString()); + + String appOwner = (String) appEntity.getInfo().get("YARN_APPLICATION_USER"); + String state = + (String) appEntity.getInfo().get("YARN_APPLICATION_STATE"); + YarnApplicationState appState = YarnApplicationState.valueOf(state); + + TimelineEntity containerEntity = null; + try { + containerEntity = + getContainerEntity(appId.toString(), containerId.toString()); + } catch (Exception ex) { + if (isFinishedState(appState)) { + // directly find logs from HDFS. + return sendStreamOutputResponse(appId, appOwner, null, containerIdStr, + filename, format, length); + } + return createBadResponse(Status.INTERNAL_SERVER_ERROR, + "Can not get ContainerInfo for the container: " + containerId); + } + + String nodeId = + (String) containerEntity.getInfo().get("YARN_CONTAINER_ALLOCATED_HOST"); + + if (isRunningState(appState)) { + String nodeHttpAddress = (String) containerEntity.getInfo() + .get("YARN_CONTAINER_ALLOCATED_HOST_HTTP_ADDRESS"); + String uri = "/" + containerId.toString() + "/logs/" + filename; + String resURI = JOINER.join(nodeHttpAddress, NM_DOWNLOAD_URI_STR, uri); + String query = req.getQueryString(); + if (query != null && !query.isEmpty()) { + resURI += "?" + query; + } + ResponseBuilder response = + Response.status(HttpServletResponse.SC_TEMPORARY_REDIRECT); + response.header("Location", resURI); + return response.build(); + } else if (isFinishedState(appState)) { + return sendStreamOutputResponse(appId, appOwner, nodeId, containerIdStr, + filename, format, length); + } else { + return createBadResponse(Status.NOT_FOUND, + "The application is not at Running or Finished State."); + } + } + + private boolean isRunningState(YarnApplicationState appState) { + return appState == YarnApplicationState.RUNNING; + } + + private boolean isFinishedState(YarnApplicationState appState) { + return appState == YarnApplicationState.FINISHED + || appState == YarnApplicationState.FAILED + || appState == YarnApplicationState.KILLED; + } + + private Response createBadResponse(Status status, String errMessage) { + Response response = Response.status(status) + .entity(DOT_JOINER.join(status.toString(), errMessage)).build(); + return response; + } + + private Response sendStreamOutputResponse(ApplicationId appId, + String appOwner, String nodeId, String containerIdStr, String fileName, + String format, long bytes) { + String contentType = WebAppUtils.getDefaultLogContentType(); + if (format != null && !format.isEmpty()) { + contentType = WebAppUtils.getSupportedLogContentType(format); + if (contentType == null) { + String errorMessage = "The valid values for the parameter : format " + + "are " + WebAppUtils.listSupportedLogContentType(); + return Response.status(Status.BAD_REQUEST).entity(errorMessage).build(); + } + } + StreamingOutput stream = null; + try { + stream = getStreamingOutput(appId, appOwner, nodeId, containerIdStr, + fileName, bytes); + } catch (Exception ex) { + return createBadResponse(Status.INTERNAL_SERVER_ERROR, ex.getMessage()); + } + if (stream == null) { + return createBadResponse(Status.INTERNAL_SERVER_ERROR, + "Can not get log for container: " + containerIdStr); + } + ResponseBuilder response = Response.ok(stream); + response.header("Content-Type", contentType); + // Sending the X-Content-Type-Options response header with the value + // nosniff will prevent Internet Explorer from MIME-sniffing a response + // away from the declared content-type. + response.header("X-Content-Type-Options", "nosniff"); + return response.build(); + } + + private StreamingOutput getStreamingOutput(ApplicationId appId, + String appOwner, final String nodeId, final String containerIdStr, + final String logFile, final long bytes) throws IOException { + Configuration conf = getConfig(); + String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); + org.apache.hadoop.fs.Path remoteRootLogDir = new org.apache.hadoop.fs.Path( + conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); + org.apache.hadoop.fs.Path qualifiedRemoteRootLogDir = + FileContext.getFileContext(conf).makeQualified(remoteRootLogDir); + FileContext fc = + FileContext.getFileContext(qualifiedRemoteRootLogDir.toUri(), conf); + org.apache.hadoop.fs.Path remoteAppDir = null; + if (appOwner == null) { + org.apache.hadoop.fs.Path toMatch = LogAggregationUtils + .getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix); + FileStatus[] matching = fc.util().globStatus(toMatch); + if (matching == null || matching.length != 1) { + return null; + } + remoteAppDir = matching[0].getPath(); + } else { + remoteAppDir = LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir, + appId, appOwner, suffix); + } + final RemoteIterator nodeFiles; + nodeFiles = fc.listStatus(remoteAppDir); + if (!nodeFiles.hasNext()) { + return null; + } + + StreamingOutput stream = new StreamingOutput() { + + @Override + public void write(OutputStream os) + throws IOException, WebApplicationException { + byte[] buf = new byte[65535]; + boolean findLogs = false; + while (nodeFiles.hasNext()) { + final FileStatus thisNodeFile = nodeFiles.next(); + String nodeName = thisNodeFile.getPath().getName(); + if ((nodeId == null + || nodeName.contains(LogAggregationUtils.getNodeString(nodeId))) + && !nodeName.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { + AggregatedLogFormat.LogReader reader = null; + try { + reader = new AggregatedLogFormat.LogReader(conf, + thisNodeFile.getPath()); + DataInputStream valueStream; + LogKey key = new LogKey(); + valueStream = reader.next(key); + while (valueStream != null + && !key.toString().equals(containerIdStr)) { + // Next container + key = new LogKey(); + valueStream = reader.next(key); + } + if (valueStream == null) { + continue; + } + while (true) { + try { + String fileType = valueStream.readUTF(); + String fileLengthStr = valueStream.readUTF(); + long fileLength = Long.parseLong(fileLengthStr); + if (fileType.equalsIgnoreCase(logFile)) { + StringBuilder sb = new StringBuilder(); + sb.append("LogType:"); + sb.append(fileType + "\n"); + sb.append("Log Upload Time:"); + sb.append(Times.format(System.currentTimeMillis()) + "\n"); + sb.append("LogLength:"); + sb.append(fileLengthStr + "\n"); + sb.append("Log Contents:\n"); + byte[] b = sb.toString().getBytes(Charset.forName("UTF-8")); + os.write(b, 0, b.length); + + long toSkip = 0; + long totalBytesToRead = fileLength; + long skipAfterRead = 0; + if (bytes < 0) { + long absBytes = Math.abs(bytes); + if (absBytes < fileLength) { + toSkip = fileLength - absBytes; + totalBytesToRead = absBytes; + } + org.apache.hadoop.io.IOUtils.skipFully(valueStream, + toSkip); + } else { + if (bytes < fileLength) { + totalBytesToRead = bytes; + skipAfterRead = fileLength - bytes; + } + } + + long curRead = 0; + long pendingRead = totalBytesToRead - curRead; + int toRead = pendingRead > buf.length ? buf.length + : (int) pendingRead; + int len = valueStream.read(buf, 0, toRead); + while (len != -1 && curRead < totalBytesToRead) { + os.write(buf, 0, len); + curRead += len; + + pendingRead = totalBytesToRead - curRead; + toRead = pendingRead > buf.length ? buf.length + : (int) pendingRead; + len = valueStream.read(buf, 0, toRead); + } + org.apache.hadoop.io.IOUtils.skipFully(valueStream, + skipAfterRead); + sb = new StringBuilder(); + sb.append("\nEnd of LogType:" + fileType + "\n"); + b = sb.toString().getBytes(Charset.forName("UTF-8")); + os.write(b, 0, b.length); + findLogs = true; + } else { + long totalSkipped = 0; + long currSkipped = 0; + while (currSkipped != -1 && totalSkipped < fileLength) { + currSkipped = valueStream.skip(fileLength - totalSkipped); + totalSkipped += currSkipped; + } + } + } catch (EOFException eof) { + break; + } + } + } finally { + if (reader != null) { + reader.close(); + } + } + } + } + os.flush(); + if (!findLogs) { + throw new IOException( + "Can not find logs for container:" + containerIdStr); + } + } + }; + return stream; + } + + private long parseLongParam(String bytes) { + if (bytes == null || bytes.isEmpty()) { + return Long.MAX_VALUE; + } + return Long.parseLong(bytes); + } + + private TimelineEntity getApplicationEntity(String appId) { + TimelineReaderManager timelineReaderManager = getTimelineReaderManager(); + TimelineEntity entity = null; + try { + entity = timelineReaderManager.getEntity( + TimelineReaderWebServicesUtils.createTimelineReaderContext(null, null, + null, null, appId, TimelineEntityType.YARN_APPLICATION.toString(), + null), + TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(null, + null, "INFO", null)); + } catch (Exception e) { + handleException(e, "flowrunid"); + } + + if (entity == null) { + throw new NotFoundException("App " + appId + " not found"); + } + return entity; + } + + private TimelineEntity getContainerEntity(String appId, String containerId) { + TimelineReaderManager timelineReaderManager = getTimelineReaderManager(); + TimelineEntity entity = null; + try { + entity = timelineReaderManager.getEntity( + TimelineReaderWebServicesUtils.createTimelineReaderContext(null, null, + null, null, appId, "YARN_CONTAINER", containerId), + TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(null, + null, "INFO", null)); + } catch (Exception e) { + handleException(e, "flowrunid"); + } + + if (entity == null) { + throw new NotFoundException("App " + appId + " not found"); + } + return entity; + } + + private static void handleException(Exception e, String invalidNumMsg) + throws BadRequestException, WebApplicationException { + if (e instanceof NumberFormatException) { + throw new BadRequestException(invalidNumMsg + " is not a numeric value."); + } else if (e instanceof IllegalArgumentException) { + throw new BadRequestException( + e.getMessage() == null ? "Requested Invalid Field." : e.getMessage()); + } else if (e instanceof NotFoundException) { + throw (NotFoundException) e; + } else if (e instanceof TimelineParseException) { + throw new BadRequestException( + e.getMessage() == null ? "Filter Parsing failed." : e.getMessage()); + } else if (e instanceof BadRequestException) { + throw (BadRequestException) e; + } else { + LOG.error("Error while processing REST request", e); + throw new WebApplicationException(e, + Response.Status.INTERNAL_SERVER_ERROR); + } + } + + private TimelineReaderManager getTimelineReaderManager() { + return (TimelineReaderManager) ctxt + .getAttribute(TimelineReaderServer.TIMELINE_READER_MANAGER_ATTR); + } + + private Configuration getConfig() { + return (Configuration) ctxt + .getAttribute(TimelineReaderServer.TIMELINE_READER_MANAGER_CONFIG); + } + +} \ No newline at end of file -- 2.7.4 (Apple Git-66)