From c039aba8e9a66d5b020d0ddab6694020ff06a392 Mon Sep 17 00:00:00 2001 From: Rohith Sharma K S Date: Fri, 5 Oct 2018 16:05:31 +0530 Subject: [PATCH] YARN-5742 --- .../hadoop/yarn/server/webapp/LogWebService.java | 826 +++++++++++++++++++++ .../yarn/server/webapp/YarnWebServiceParams.java | 1 + .../yarn/server/webapp/TestLogWebService.java | 127 ++++ .../reader/TimelineReaderServer.java | 4 +- 4 files changed, 957 insertions(+), 1 deletion(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebService.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/webapp/TestLogWebService.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebService.java new file mode 100644 index 00000000000..489b0431e14 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebService.java @@ -0,0 +1,826 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.webapp; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.inject.Singleton; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientHandlerException; +import com.sun.jersey.api.client.ClientRequest; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.UniformInterfaceException; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.config.DefaultClientConfig; +import com.sun.jersey.api.client.filter.ClientFilter; +import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; +import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; +import com.sun.jersey.core.util.MultivaluedMapImpl; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.http.JettyUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.client.AuthenticatedURL; +import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.apache.hadoop.security.authorize.AuthorizationException; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; +import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; +import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo; +import org.apache.hadoop.yarn.webapp.BadRequestException; +import org.apache.hadoop.yarn.webapp.ForbiddenException; +import org.apache.hadoop.yarn.webapp.NotFoundException; +import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; +import org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.GenericEntity; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.ConnectException; +import java.net.HttpURLConnection; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.net.URL; +import java.nio.charset.Charset; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType.YARN_CONTAINER; + +/** + * Support only ATSv2 client only. + */ +@Singleton @Path("/ws/v2/applicationlog") public class LogWebService { + private static final Logger LOG = + LoggerFactory.getLogger(LogWebService.class); + private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/"; + private static final String NM_DOWNLOAD_URI_STR = "/ws/v1/node/containers"; + private static final Joiner JOINER = Joiner.on(""); + private static final Joiner DOT_JOINER = Joiner.on(". "); + private static Configuration YARN_CONFIG = new YarnConfiguration();; + private static LogAggregationFileControllerFactory factory; + private static String base; + private static String defaultClusterid; + + private volatile Client webTimelineClient; + + static { + init(); + } + + // initialize all the common resources - order is important + private static void init() { + factory = new LogAggregationFileControllerFactory(YARN_CONFIG); + String address = + YARN_CONFIG.get(YarnConfiguration.TIMELINE_SERVICE_READER_WEBAPP_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_READER_WEBAPP_ADDRESS); + base = JOINER + .join(YarnConfiguration.useHttps(YARN_CONFIG) ? "https://" : "http://", + address, RESOURCE_URI_STR_V2); + defaultClusterid = YARN_CONFIG.get(YarnConfiguration.RM_CLUSTER_ID, + YarnConfiguration.DEFAULT_RM_CLUSTER_ID); + LOG.info("Initialized LogWeService with clusterid " + defaultClusterid); + } + + private Client createTimelineWebClient() { + ClientConfig cfg = new DefaultClientConfig(); + cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class); + Client client = new Client( + new URLConnectionClientHandler(new HttpURLConnectionFactory() { + @Override public HttpURLConnection getHttpURLConnection(URL url) + throws IOException { + AuthenticatedURL.Token token = new AuthenticatedURL.Token(); + HttpURLConnection conn = null; + try { + conn = new AuthenticatedURL().openConnection(url, token); + LOG.info("LogWeService:Connecetion created."); + } catch (AuthenticationException e) { + throw new IOException(e); + } + return conn; + } + }), cfg); + + int maxRetries = + YARN_CONFIG.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES); + long retryInterval = YARN_CONFIG.getLong( + YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS); + + // Set up Retry WebService Client + ClientJerseyRetryFilter retryFilter = new ClientJerseyRetryFilter( + new ClientConnectionRetry(maxRetries, retryInterval)); + client.addFilter(retryFilter); + return client; + } + + protected void initForReadableEndpoints(HttpServletResponse response) { + // clear content type + response.setContentType(null); + } + + /** + * Returns log file's name as well as current file size for a container. + * + * @param req HttpServletRequest + * @param res HttpServletResponse + * @param containerIdStr The container ID + * @param nmId The Node Manager NodeId + * @param redirectedFromNode Whether this is a redirected request from NM + * @return The log file's name and current file size + */ + @GET @Path("/containers/{containerid}/logs") + @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public Response getContainerLogsInfo(@Context HttpServletRequest req, + @Context HttpServletResponse res, + @PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr, + @QueryParam(YarnWebServiceParams.NM_ID) String nmId, + @QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE) + @DefaultValue("false") boolean redirectedFromNode, + @QueryParam(YarnWebServiceParams.CLUSTER_ID) String clusterId) { + ContainerId containerId = null; + initForReadableEndpoints(res); + try { + containerId = ContainerId.fromString(containerIdStr); + } catch (IllegalArgumentException e) { + throw new BadRequestException("invalid container id, " + containerIdStr); + } + + ApplicationId appId = + containerId.getApplicationAttemptId().getApplicationId(); + AppInfo appInfo; + try { + appInfo = getApp(req, appId.toString(), clusterId); + } catch (Exception ex) { + // directly find logs from HDFS. + return getContainerLogMeta(appId, null, null, containerIdStr, false); + } + // if the application finishes, directly find logs + // from HDFS. + if (isFinishedState(appInfo.getAppState())) { + return getContainerLogMeta(appId, null, null, containerIdStr, false); + } + if (isRunningState(appInfo.getAppState())) { + String appOwner = appInfo.getUser(); + String nodeHttpAddress = null; + if (nmId != null && !nmId.isEmpty()) { + try { + nodeHttpAddress = getNMWebAddressFromRM(YARN_CONFIG, nmId); + } catch (Exception ex) { + if (LOG.isDebugEnabled()) { + LOG.debug(ex.getMessage()); + } + } + } + if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()) { + ContainerInfo containerInfo; + try { + containerInfo = getContainer( + req, appId.toString(), containerId.toString(), clusterId); + } catch (Exception ex) { + // return log meta for the aggregated logs if exists. + // It will also return empty log meta for the local logs. + return getContainerLogMeta(appId, appOwner, null, containerIdStr, + true); + } + nodeHttpAddress = containerInfo.getNodeHttpAddress(); + // make sure nodeHttpAddress is not null and not empty. Otherwise, + // we would only get log meta for aggregated logs instead of + // re-directing the request + if (nodeHttpAddress == null || nodeHttpAddress.isEmpty() + || redirectedFromNode) { + // return log meta for the aggregated logs if exists. + // It will also return empty log meta for the local logs. + // If this is the redirect request from NM, we should not + // re-direct the request back. Simply output the aggregated log meta. + return getContainerLogMeta(appId, appOwner, null, containerIdStr, + true); + } + } + String uri = "/" + containerId.toString() + "/logs"; + String resURI = JOINER + .join(getAbsoluteNMWebAddress(nodeHttpAddress), NM_DOWNLOAD_URI_STR, + uri); + String query = req.getQueryString(); + if (query != null && !query.isEmpty()) { + resURI += "?" + query; + } + Response.ResponseBuilder response = + Response.status(HttpServletResponse.SC_TEMPORARY_REDIRECT); + response.header("Location", resURI); + return response.build(); + } else { + throw new NotFoundException( + "The application is not at Running or Finished State."); + } + } + + protected ContainerInfo getContainer(HttpServletRequest req, String appId, + String containerId, String clusterId) { + UserGroupInformation callerUGI = getUser(req); + String cId = clusterId != null ? clusterId : defaultClusterid; + MultivaluedMap params = new MultivaluedMapImpl(); + params.add("fields", "INFO"); + String path = JOINER.join("clusters/", cId, "/apps/", appId, "/entities/", + YARN_CONTAINER.toString(), "/", containerId); + TimelineEntity conEntity = null; + try { + if (callerUGI == null) { + conEntity = getEntity(path, params); + } else { + conEntity = + callerUGI.doAs(new PrivilegedExceptionAction() { + @Override public TimelineEntity run() throws Exception { + return getEntity(path, params); + } + }); + } + } catch (Exception e) { + rewrapAndThrowException(e); + } + if (conEntity == null) { + return null; + } + String nodeHttpAddress = (String) conEntity.getInfo() + .get(ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO); + + ContainerInfo info = new ContainerInfo(nodeHttpAddress); + return info; + } + + protected AppInfo getApp(HttpServletRequest req, String appId, + String clusterId) { + UserGroupInformation callerUGI = getUser(req); + + String cId = clusterId != null ? clusterId : defaultClusterid; + MultivaluedMap params = new MultivaluedMapImpl(); + params.add("fields", "INFO"); + String path = JOINER.join("clusters/", cId, "/apps/", appId); + TimelineEntity appEntity = null; + + try { + if (callerUGI == null) { + appEntity = getEntity(path, params); + } else { + appEntity = + callerUGI.doAs(new PrivilegedExceptionAction() { + @Override public TimelineEntity run() throws Exception { + return getEntity(path, params); + } + }); + } + } catch (Exception e) { + rewrapAndThrowException(e); + } + + if (appEntity == null) { + return null; + } + String appOwner = (String) appEntity.getInfo() + .get(ApplicationMetricsConstants.USER_ENTITY_INFO); + String state = (String) appEntity.getInfo() + .get(ApplicationMetricsConstants.STATE_EVENT_INFO); + YarnApplicationState appState = YarnApplicationState.valueOf(state); + AppInfo info = new AppInfo(appState, appOwner); + return info; + } + + /** + * Returns the contents of a container's log file in plain text. + * + * @param req HttpServletRequest + * @param res HttpServletResponse + * @param containerIdStr The container ID + * @param filename The name of the log file + * @param format The content type + * @param size the size of the log file + * @param nmId The Node Manager NodeId + * @param redirectedFromNode Whether this is the redirect request from NM + * @return The contents of the container's log file + */ + @GET @Path("/containers/{containerid}/logs/{filename}") + @Produces({ MediaType.TEXT_PLAIN }) @InterfaceAudience.Public + @InterfaceStability.Unstable public Response getContainerLogFile( + @Context HttpServletRequest req, @Context HttpServletResponse res, + @PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr, + @PathParam(YarnWebServiceParams.CONTAINER_LOG_FILE_NAME) String filename, + @QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_FORMAT) String format, + @QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_SIZE) String size, + @QueryParam(YarnWebServiceParams.NM_ID) String nmId, + @QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE) + boolean redirectedFromNode, + @QueryParam(YarnWebServiceParams.CLUSTER_ID) String clusterId) { + return getLogs(req, res, containerIdStr, filename, format, size, nmId, + redirectedFromNode, clusterId); + } + + //TODO: YARN-4993: Refactory ContainersLogsBlock, AggregatedLogsBlock and + // container log webservice introduced in AHS to minimize + // the duplication. + @GET @Path("/containerlogs/{containerid}/{filename}") + @Produces({ MediaType.TEXT_PLAIN + "; " + JettyUtils.UTF_8 }) + @InterfaceAudience.Public @InterfaceStability.Unstable + public Response getLogs(@Context HttpServletRequest req, + @Context HttpServletResponse res, + @PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr, + @PathParam(YarnWebServiceParams.CONTAINER_LOG_FILE_NAME) String filename, + @QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_FORMAT) String format, + @QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_SIZE) String size, + @QueryParam(YarnWebServiceParams.NM_ID) String nmId, + @QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE) + @DefaultValue("false") boolean redirectedFromNode, + @QueryParam(YarnWebServiceParams.CLUSTER_ID) String clusterId) { + initForReadableEndpoints(res); + ContainerId containerId; + try { + containerId = ContainerId.fromString(containerIdStr); + } catch (IllegalArgumentException ex) { + return createBadResponse(Response.Status.NOT_FOUND, + "Invalid ContainerId: " + containerIdStr); + } + + final long length = parseLongParam(size); + + ApplicationId appId = + containerId.getApplicationAttemptId().getApplicationId(); + AppInfo appInfo; + try { + appInfo = getApp(req, appId.toString(), clusterId); + } catch (Exception ex) { + // directly find logs from HDFS. + return sendStreamOutputResponse(appId, null, null, containerIdStr, + filename, format, length, false); + } + String appOwner = appInfo.getUser(); + if (isFinishedState(appInfo.getAppState())) { + // directly find logs from HDFS. + return sendStreamOutputResponse(appId, appOwner, null, containerIdStr, + filename, format, length, false); + } + + if (isRunningState(appInfo.getAppState())) { + String nodeHttpAddress = null; + if (nmId != null && !nmId.isEmpty()) { + try { + nodeHttpAddress = getNMWebAddressFromRM(YARN_CONFIG, nmId); + } catch (Exception ex) { + if (LOG.isDebugEnabled()) { + LOG.debug(ex.getMessage()); + } + } + } + if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()) { + ContainerInfo containerInfo; + try { + containerInfo = + getContainer(req, appId.toString(), containerId.toString(), + clusterId); + } catch (Exception ex) { + // output the aggregated logs + return sendStreamOutputResponse(appId, appOwner, null, containerIdStr, + filename, format, length, true); + } + nodeHttpAddress = containerInfo.getNodeHttpAddress(); + // make sure nodeHttpAddress is not null and not empty. Otherwise, + // we would only get aggregated logs instead of re-directing the + // request. + // If this is the redirect request from NM, we should not re-direct the + // request back. Simply output the aggregated logs. + if (nodeHttpAddress == null || nodeHttpAddress.isEmpty() + || redirectedFromNode) { + // output the aggregated logs + return sendStreamOutputResponse(appId, appOwner, null, containerIdStr, + filename, format, length, true); + } + } + String uri = "/" + containerId.toString() + "/logs/" + filename; + String resURI = JOINER + .join(getAbsoluteNMWebAddress(nodeHttpAddress), NM_DOWNLOAD_URI_STR, + uri); + String query = req.getQueryString(); + if (query != null && !query.isEmpty()) { + resURI += "?" + query; + } + Response.ResponseBuilder response = + Response.status(HttpServletResponse.SC_TEMPORARY_REDIRECT); + response.header("Location", resURI); + return response.build(); + } else { + return createBadResponse(Response.Status.NOT_FOUND, + "The application is not at Running or Finished State."); + } + } + + private boolean isRunningState(YarnApplicationState appState) { + return appState == YarnApplicationState.RUNNING; + } + + private boolean isFinishedState(YarnApplicationState appState) { + return appState == YarnApplicationState.FINISHED + || appState == YarnApplicationState.FAILED + || appState == YarnApplicationState.KILLED; + } + + private Response createBadResponse(Response.Status status, + String errMessage) { + Response response = Response.status(status) + .entity(DOT_JOINER.join(status.toString(), errMessage)).build(); + return response; + } + + private Response sendStreamOutputResponse(ApplicationId appId, + String appOwner, String nodeId, String containerIdStr, String fileName, + String format, long bytes, boolean printEmptyLocalContainerLog) { + String contentType = WebAppUtils.getDefaultLogContentType(); + if (format != null && !format.isEmpty()) { + contentType = WebAppUtils.getSupportedLogContentType(format); + if (contentType == null) { + String errorMessage = + "The valid values for the parameter : format " + "are " + + WebAppUtils.listSupportedLogContentType(); + return Response.status(Response.Status.BAD_REQUEST).entity(errorMessage) + .build(); + } + } + StreamingOutput stream = null; + try { + stream = + getStreamingOutput(appId, appOwner, nodeId, containerIdStr, fileName, + bytes, printEmptyLocalContainerLog); + } catch (Exception ex) { + return createBadResponse(Response.Status.INTERNAL_SERVER_ERROR, + ex.getMessage()); + } + Response.ResponseBuilder response = Response.ok(stream); + response.header("Content-Type", contentType); + // Sending the X-Content-Type-Options response header with the value + // nosniff will prevent Internet Explorer from MIME-sniffing a response + // away from the declared content-type. + response.header("X-Content-Type-Options", "nosniff"); + return response.build(); + } + + private StreamingOutput getStreamingOutput(final ApplicationId appId, + final String appOwner, final String nodeId, final String containerIdStr, + final String logFile, final long bytes, + final boolean printEmptyLocalContainerLog) throws IOException { + StreamingOutput stream = new StreamingOutput() { + + @Override public void write(OutputStream os) + throws IOException, WebApplicationException { + ContainerLogsRequest request = new ContainerLogsRequest(); + request.setAppId(appId); + request.setAppOwner(appOwner); + request.setContainerId(containerIdStr); + request.setBytes(bytes); + request.setNodeId(nodeId); + Set logTypes = new HashSet<>(); + logTypes.add(logFile); + request.setLogTypes(logTypes); + boolean findLogs = factory.getFileControllerForRead(appId, appOwner) + .readAggregatedLogs(request, os); + if (!findLogs) { + os.write(("Can not find logs for container:" + containerIdStr) + .getBytes(Charset.forName("UTF-8"))); + } else { + if (printEmptyLocalContainerLog) { + StringBuilder sb = new StringBuilder(); + sb.append(containerIdStr + "\n"); + sb.append("LogAggregationType: " + ContainerLogAggregationType.LOCAL + + "\n"); + sb.append("LogContents:\n"); + sb.append(getNoRedirectWarning() + "\n"); + os.write(sb.toString().getBytes(Charset.forName("UTF-8"))); + } + } + } + }; + return stream; + } + + private long parseLongParam(String bytes) { + if (bytes == null || bytes.isEmpty()) { + return Long.MAX_VALUE; + } + return Long.parseLong(bytes); + } + + private Response getContainerLogMeta(ApplicationId appId, String appOwner, + final String nodeId, final String containerIdStr, + boolean emptyLocalContainerLogMeta) { + try { + ContainerLogsRequest request = new ContainerLogsRequest(); + request.setAppId(appId); + request.setAppOwner(appOwner); + request.setContainerId(containerIdStr); + request.setNodeId(nodeId); + List containerLogMeta = + factory.getFileControllerForRead(appId, appOwner) + .readAggregatedLogsMeta(request); + if (containerLogMeta.isEmpty()) { + throw new NotFoundException( + "Can not get log meta for container: " + containerIdStr); + } + List containersLogsInfo = new ArrayList<>(); + for (ContainerLogMeta meta : containerLogMeta) { + ContainerLogsInfo logInfo = + new ContainerLogsInfo(meta, ContainerLogAggregationType.AGGREGATED); + containersLogsInfo.add(logInfo); + } + if (emptyLocalContainerLogMeta) { + ContainerLogMeta emptyMeta = + new ContainerLogMeta(containerIdStr, "N/A"); + ContainerLogsInfo empty = + new ContainerLogsInfo(emptyMeta, ContainerLogAggregationType.LOCAL); + containersLogsInfo.add(empty); + } + GenericEntity> meta = + new GenericEntity>(containersLogsInfo) { + }; + Response.ResponseBuilder response = Response.ok(meta); + // Sending the X-Content-Type-Options response header with the value + // nosniff will prevent Internet Explorer from MIME-sniffing a response + // away from the declared content-type. + response.header("X-Content-Type-Options", "nosniff"); + return response.build(); + } catch (Exception ex) { + throw new WebApplicationException(ex); + } + } + + @InterfaceAudience.Private @VisibleForTesting + public static String getNoRedirectWarning() { + return "We do not have NodeManager web address, so we can not " + + "re-direct the request to related NodeManager " + + "for local container logs."; + } + + private String getAbsoluteNMWebAddress(String nmWebAddress) { + if (nmWebAddress.contains(WebAppUtils.HTTP_PREFIX) || nmWebAddress + .contains(WebAppUtils.HTTPS_PREFIX)) { + return nmWebAddress; + } + return WebAppUtils.getHttpSchemePrefix(YARN_CONFIG) + nmWebAddress; + } + + @VisibleForTesting @InterfaceAudience.Private + public String getNMWebAddressFromRM(Configuration configuration, + String nodeId) + throws ClientHandlerException, UniformInterfaceException, JSONException { + JSONObject nodeInfo = + YarnWebServiceUtils.getNodeInfoFromRMWebService(configuration, nodeId) + .getJSONObject("node"); + return nodeInfo.has("nodeHTTPAddress") ? + nodeInfo.getString("nodeHTTPAddress") : null; + } + + // Class to handle retry + static class ClientConnectionRetry { + + // maxRetries < 0 means keep trying + @InterfaceAudience.Private @VisibleForTesting private int maxRetries; + + @InterfaceAudience.Private @VisibleForTesting private long retryInterval; + + // Indicates if retries happened last time. Only tests should read it. + // In unit tests, retryOn() calls should _not_ be concurrent. + private boolean retried = false; + + @InterfaceAudience.Private @VisibleForTesting boolean getRetired() { + return retried; + } + + // Constructor with default retry settings + public ClientConnectionRetry(int inputMaxRetries, long inputRetryInterval) { + this.maxRetries = inputMaxRetries; + this.retryInterval = inputRetryInterval; + } + + public Object retryOn(ClientRetryOp op) + throws RuntimeException, IOException { + int leftRetries = maxRetries; + retried = false; + + // keep trying + while (true) { + try { + // try perform the op, if fail, keep retrying + return op.run(); + } catch (IOException | RuntimeException e) { + // break if there's no retries left + if (leftRetries == 0) { + break; + } + if (op.shouldRetryOn(e)) { + logException(e, leftRetries); + } else { + throw e; + } + } + if (leftRetries > 0) { + leftRetries--; + } + retried = true; + try { + // sleep for the given time interval + Thread.sleep(retryInterval); + } catch (InterruptedException ie) { + LOG.error("Client retry sleep interrupted! "); + } + } + throw new RuntimeException("Connection retries limit exceeded."); + } + + ; + + private void logException(Exception e, int leftRetries) { + if (leftRetries > 0) { + LOG.error("Exception caught by ClientConnectionRetry," + " will try " + + leftRetries + " more time(s).\nMessage: " + e.getMessage()); + } else { + // note that maxRetries may be -1 at the very beginning + LOG.error("ConnectionException caught by ClientConnectionRetry," + + " will keep retrying.\nMessage: " + e.getMessage()); + } + } + } + + private static class ClientJerseyRetryFilter extends ClientFilter { + + private ClientConnectionRetry connectionRetry; + + ClientJerseyRetryFilter(ClientConnectionRetry retry) { + connectionRetry = retry; + } + + @Override public ClientResponse handle(final ClientRequest cr) + throws ClientHandlerException { + // Set up the retry operation + ClientRetryOp jerseyRetryOp = new ClientRetryOp() { + @Override public Object run() { + // Try pass the request, if fail, keep retrying + return getNext().handle(cr); + } + + @Override public boolean shouldRetryOn(Exception e) { + // Only retry on connection exceptions + return (e instanceof ClientHandlerException) && ( + e.getCause() instanceof ConnectException || e + .getCause() instanceof SocketTimeoutException || e + .getCause() instanceof SocketException); + } + }; + try { + return (ClientResponse) connectionRetry.retryOn(jerseyRetryOp); + } catch (IOException e) { + throw new ClientHandlerException( + "Jersey retry failed!\nMessage: " + e.getMessage()); + } + } + } + + // Abstract class for an operation that should be retried by client + private static abstract class ClientRetryOp { + // The operation that should be retried + public abstract Object run() throws IOException; + + // The method to indicate if we should retry given the incoming exception + public abstract boolean shouldRetryOn(Exception e); + } + + protected static UserGroupInformation getUser(HttpServletRequest req) { + String remoteUser = req.getRemoteUser(); + UserGroupInformation callerUGI = null; + if (remoteUser != null) { + callerUGI = UserGroupInformation.createRemoteUser(remoteUser); + } + return callerUGI; + } + + protected static class AppInfo { + private YarnApplicationState appState; + private String user; + + AppInfo(YarnApplicationState appState, String user) { + this.appState = appState; + this.user = user; + } + + public YarnApplicationState getAppState() { + return this.appState; + } + + public String getUser() { + return this.user; + } + } + + protected static class ContainerInfo { + private String nodeHttpAddress; + + ContainerInfo(String nodeHttpAddress) { + this.nodeHttpAddress = nodeHttpAddress; + } + + public String getNodeHttpAddress() { + return nodeHttpAddress; + } + } + + @VisibleForTesting protected TimelineEntity getEntity(String path, + MultivaluedMap params) throws IOException { + ClientResponse resp = + getClient().resource(base).path(path).queryParams(params) + .accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + if (resp == null + || resp.getStatusInfo().getStatusCode() != ClientResponse.Status.OK + .getStatusCode()) { + String msg = + "Response from the timeline reader server is " + ((resp == null) ? + "null" : + "not successful," + " HTTP error code: " + resp.getStatus() + + ", Server response:\n" + resp.getEntity(String.class)); + LOG.error(msg); + throw new IOException(msg); + } + TimelineEntity entity = resp.getEntity(TimelineEntity.class); + return entity; + } + + private Client getClient() { + if (webTimelineClient == null) { + synchronized (LogWebService.class) { + if (webTimelineClient == null) { + webTimelineClient = createTimelineWebClient(); + } + } + } + return webTimelineClient; + } + + private static void rewrapAndThrowException(Exception e) { + if (e instanceof UndeclaredThrowableException) { + rewrapAndThrowThrowable(e.getCause()); + } else { + rewrapAndThrowThrowable(e); + } + } + + private static void rewrapAndThrowThrowable(Throwable t) { + if (t instanceof AuthorizationException) { + throw new ForbiddenException(t); + } else { + throw new WebApplicationException(t); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java index 479cc758138..5f96f231524 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java @@ -35,4 +35,5 @@ String RESPONSE_CONTENT_SIZE = "size"; String NM_ID = "nm.id"; String REDIRECTED_FROM_NODE = "redirected_from_node"; + String CLUSTER_ID = "clusterid"; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/webapp/TestLogWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/webapp/TestLogWebService.java new file mode 100644 index 00000000000..98dae98267a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/webapp/TestLogWebService.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.webapp; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class TestLogWebService { + + private HttpServletRequest request; + private LogWebServiceTest logWebService; + private static TimelineEntity entity; + private ApplicationId appId; + private ContainerId cId; + private String user = "user1"; + private Map entities; + private String nodeHttpAddress = "localhost:0"; + + @Before public void setup() throws Exception { + appId = ApplicationId.fromString("application_1518143905142_509690"); + cId = + ContainerId.fromString("container_e138_1518143905142_509690_01_000001"); + entities = new HashMap<>(); + generateEntity(); + request = Mockito.mock(HttpServletRequest.class); + Mockito.when(request.getRemoteUser()) + .thenReturn(System.getProperty("user.name")); + logWebService = new LogWebServiceTest(); + + } + + @Test public void testGetApp() { + + LogWebService.AppInfo app = + logWebService.getApp(request, appId.toString(), null); + Assert.assertEquals("RUNNING", app.getAppState().toString()); + Assert.assertEquals(user, app.getUser()); + } + + @Test public void testGetContainer() { + LogWebService.ContainerInfo container = logWebService + .getContainer(request, appId.toString(), cId.toString(), null); + Assert.assertEquals(nodeHttpAddress, container.getNodeHttpAddress()); + } + + class LogWebServiceTest extends LogWebService { + + @Override protected TimelineEntity getEntity(String path, + MultivaluedMap params) throws IOException { + if (path.endsWith(cId.toString())) { + return entities.get(cId.toString()); + } else if (path.endsWith(appId.toString())) { + return entities.get(appId.toString()); + } else { + throw new IOException(); + } + } + } + + private void generateEntity() { + createAppEntities(); + createContainerEntities(); + } + + private void createContainerEntities() { + TimelineEntity timelineEntity = + generateEntity(TimelineEntityType.YARN_APPLICATION.toString(), + appId.toString()); + timelineEntity.addInfo(ApplicationMetricsConstants.USER_ENTITY_INFO, user); + timelineEntity + .addInfo(ApplicationMetricsConstants.STATE_EVENT_INFO, "RUNNING"); + entities.put(appId.toString(), timelineEntity); + } + + private void createAppEntities() { + TimelineEntity timelineEntity = + generateEntity(TimelineEntityType.YARN_CONTAINER.toString(), + cId.toString()); + timelineEntity + .addInfo(ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO, + nodeHttpAddress); + entities.put(cId.toString(), timelineEntity); + } + + private static TimelineEntity generateEntity(String entityType, + String entityId) { + TimelineEntity entity = new TimelineEntity(); + entity.setId(entityId); + entity.setType(entityType); + entity.setCreatedTime(System.currentTimeMillis()); + return entity; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java index bd2c428c4fb..8f1e7d74e41 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.security.TimelineReaderWhitelistAuthorizationFilterInitializer; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader; import org.apache.hadoop.yarn.server.util.timeline.TimelineServerUtils; +import org.apache.hadoop.yarn.server.webapp.LogWebService; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; @@ -201,7 +202,8 @@ private void startTimelineReaderWebApp() { readerWebServer.addJerseyResourcePackage( TimelineReaderWebServices.class.getPackage().getName() + ";" + GenericExceptionHandler.class.getPackage().getName() + ";" - + YarnJacksonJaxbJsonProvider.class.getPackage().getName(), + + YarnJacksonJaxbJsonProvider.class.getPackage().getName()+ ";" + + LogWebService.class.getPackage().getName(), "/*"); readerWebServer.setAttribute(TIMELINE_READER_MANAGER_ATTR, timelineReaderManager); -- 2.13.6 (Apple Git-96)