diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index cecc1ce..5aad4fa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1005,6 +1005,9 @@ public static final String ATS_PREFIX = YARN_PREFIX + "ats."; + /** ATS store class */ + public static final String ATS_STORE = ATS_PREFIX + "store.class"; + // Redirect ATS web configurations to AHS as ATS REST APIs are hosted by // AHS web server /** The address of the ATS web application.*/ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index c493742..642be72 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -588,6 +588,14 @@ org.apache.hadoop.yarn.server.applicationhistoryservice.NullApplicationHistoryStore + + + + Store class name for application timeline store + yarn.ats.store.class + org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.ApplicationTimelineStore + + The hostname of the NM. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java index 3a864c8..f053c5b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java @@ -27,11 +27,13 @@ import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.service.Service; import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.ApplicationTimelineStore; import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp; import org.apache.hadoop.yarn.webapp.WebApp; import org.apache.hadoop.yarn.webapp.WebApps; @@ -51,6 +53,7 @@ ApplicationHistoryClientService ahsClientService; ApplicationHistoryManager historyManager; + ApplicationTimelineStore timelineStore; private WebApp webApp; public ApplicationHistoryServer() { @@ -63,6 +66,8 @@ protected void serviceInit(Configuration conf) throws Exception { ahsClientService = createApplicationHistoryClientService(historyManager); addService(ahsClientService); addService((Service) historyManager); + timelineStore = createApplicationTimeStore(conf); + addIfService(timelineStore); super.serviceInit(conf); } @@ -135,6 +140,15 @@ protected ApplicationHistoryManager createApplicationHistoryManager( return new ApplicationHistoryManagerImpl(); } + protected ApplicationTimelineStore createApplicationTimeStore( + Configuration conf) { + // TODO: need to replace the first ApplicationTimelineStore.class with the + // default class + return ReflectionUtils.newInstance(conf.getClass( + YarnConfiguration.ATS_STORE, ApplicationTimelineStore.class, + ApplicationTimelineStore.class), conf); + } + protected void startWebApp() { String bindAddress = WebAppUtils.getAHSWebAppURLWithoutScheme(getConfig()); LOG.info("Instantiating AHSWebApp at " + bindAddress); @@ -148,7 +162,8 @@ protected void startWebApp() { YarnConfiguration.AHS_WEBAPP_SPNEGO_USER_NAME_KEY) .withHttpSpnegoKeytabKey( YarnConfiguration.AHS_WEBAPP_SPNEGO_KEYTAB_FILE_KEY) - .at(bindAddress).start(new AHSWebApp(historyManager)); + .at(bindAddress) + .start(new AHSWebApp(historyManager, timelineStore)); } catch (Exception e) { String msg = "AHSWebApp failed to start."; LOG.error(msg, e); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java index 81f8383..d2cfc32 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java @@ -21,24 +21,31 @@ import org.apache.hadoop.yarn.server.api.ApplicationContext; import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManager; +import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.ApplicationTimelineStore; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.WebApp; +import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import org.apache.hadoop.yarn.webapp.YarnWebParams; public class AHSWebApp extends WebApp implements YarnWebParams { private final ApplicationHistoryManager applicationHistoryManager; + private final ApplicationTimelineStore applicationTimelineStore; - public AHSWebApp(ApplicationHistoryManager applicationHistoryManager) { + public AHSWebApp(ApplicationHistoryManager applicationHistoryManager, + ApplicationTimelineStore applicationTimelineStore) { this.applicationHistoryManager = applicationHistoryManager; + this.applicationTimelineStore = applicationTimelineStore; } @Override public void setup() { - bind(JAXBContextResolver.class); + bind(YarnJacksonJaxbJsonProvider.class); bind(AHSWebServices.class); + bind(ATSWebServices.class); bind(GenericExceptionHandler.class); bind(ApplicationContext.class).toInstance(applicationHistoryManager); + bind(ApplicationTimelineStore.class).toInstance(applicationTimelineStore); route("/", AHSController.class); route(pajoin("/apps", APP_STATE), AHSController.class); route(pajoin("/app", APPLICATION_ID), AHSController.class, "app"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/ATSWebServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/ATSWebServices.java new file mode 100644 index 0000000..e589ef0 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/ATSWebServices.java @@ -0,0 +1,379 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvent; +import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvents; +import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.ApplicationTimelineReader.Field; +import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.ApplicationTimelineStore; +import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.Entity; +import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.EntityInfo; +import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.Event; +import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.NameValuePair; +import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.TimelinePutRequest; +import org.apache.hadoop.yarn.webapp.BadRequestException; + +import com.google.inject.Inject; +import com.google.inject.Singleton; + +@Singleton +@Path("/ws/v1/apptimeline") +public class ATSWebServices { + + private ApplicationTimelineStore store; + + @Inject + public ATSWebServices(ApplicationTimelineStore store) { + this.store = store; + } + + @XmlRootElement(name = "about") + @XmlAccessorType(XmlAccessType.NONE) + @Public + @Unstable + public static class AboutInfo { + + private String about; + + public AboutInfo() { + + } + + public AboutInfo(String about) { + this.about = about; + } + + @XmlElement(name = "About") + public String getAbout() { + return about; + } + + public void setAbout(String about) { + this.about = about; + } + + } + + @GET + @Path("/") + @Produces({ MediaType.APPLICATION_JSON/*, MediaType.APPLICATION_XML*/ }) + public AboutInfo about( + @Context HttpServletRequest req, + @Context HttpServletResponse res) { + init(res); + return new AboutInfo("Application Timeline API"); + } + + @GET + @Path("/{entityType}") + @Produces({ MediaType.APPLICATION_JSON/*, MediaType.APPLICATION_XML*/ }) + public ATSEntities getEntities( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @PathParam("entityType") String entityType, + @QueryParam("primaryFilter") String primaryFilter, + @QueryParam("secondaryFilter") String secondaryFilter, + @QueryParam("windowStart") String windowStart, + @QueryParam("windowEnd") String windowEnd, + @QueryParam("limit") String limit, + @QueryParam("fields") String fields) { + init(res); + List timeline = null; + try { + timeline = store.getEntities( + parseStr(entityType), + parseLongStr(limit), + parseLongStr(windowStart), + parseLongStr(windowEnd), + parsePairStr(primaryFilter, ":"), + parsePairsStr(secondaryFilter, ",", ":"), + parseFieldsStr(fields, ",")); + } catch (NumberFormatException e) { + throw new BadRequestException( + "windowStart, windowEnd or limit is not a numeric value."); + } catch (IllegalArgumentException e) { + throw new BadRequestException( + "requested invalid field."); + } + if (timeline == null || timeline.isEmpty()) { + return new ATSEntities(); + } + ATSEntities atsEntities = new ATSEntities(); + for (EntityInfo entity : timeline) { + atsEntities.addEntity(convertFrom(entity)); + } + return atsEntities; + } + + @GET + @Path("/{entityType}/{entityId}") + @Produces({ MediaType.APPLICATION_JSON/*, MediaType.APPLICATION_XML*/ }) + public ATSEntity getEntity( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @PathParam("entityType") String entityType, + @PathParam("entityId") String entityId, + @QueryParam("fields") String fields) { + init(res); + Entity entity = new Entity(parseStr(entityId), parseStr(entityType)); + EntityInfo entityInfo = null; + try { + entityInfo = store.getEntityInfo(entity, parseFieldsStr(fields, ",")); + } catch (IllegalArgumentException e) { + throw new BadRequestException( + "requested invalid field."); + } + if (entityInfo == null) { + throw new WebApplicationException(Response.Status.NOT_FOUND); + } + return convertFrom(entityInfo); + } + + @GET + @Path("/{entityType}/events") + @Produces({ MediaType.APPLICATION_JSON/*, MediaType.APPLICATION_XML*/ }) + public ATSEvents getEvents( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @PathParam("entityType") String entityType, + @QueryParam("entityId") String entityId, + @QueryParam("eventType") String eventType, + @QueryParam("windowStart") String windowStart, + @QueryParam("windowEnd") String windowEnd, + @QueryParam("limit") String limit) { + init(res); + Map> timeline = null; + try { + timeline = store.getEntityTimelines( + parseStr(entityType), + parseArrayStr(entityId, ","), + parseLongStr(limit), + parseLongStr(windowStart), + parseLongStr(windowEnd), + parseArrayStr(eventType, ",")); + } catch (NumberFormatException e) { + throw new BadRequestException( + "windowStart, windowEnd or limit is not a numeric value."); + } + if (timeline == null || timeline.isEmpty()) { + return new ATSEvents(); + } + ATSEvents atsTimeline = new ATSEvents(); + for (Map.Entry> events : timeline.entrySet()) { + ATSEvents.ATSEventsOfOneEntity atsEvents = + new ATSEvents.ATSEventsOfOneEntity(); + atsEvents.setEntityId(events.getKey().getId()); + atsEvents.setEntityType(events.getKey().getType()); + for (Event event : events.getValue()) { + ATSEvent atsEvent = new ATSEvent(); + atsEvent.setTimestamp(event.getTs()); + atsEvent.setEventType(event.getType()); + atsEvent.addEventInfo(event.getInfo()); + atsEvents.addEntity(atsEvent); + } + atsTimeline.addEvent(atsEvents); + } + return atsTimeline; + } + + @POST + @Path("/") + @Consumes({ MediaType.APPLICATION_JSON/*, MediaType.APPLICATION_XML*/ }) + public Response postEntities( + ATSEntities entities) { + List requests = new ArrayList(); + for (ATSEntity entity : entities.getEntities()) { + TimelinePutRequest request = TimelinePutRequest.newInstance( + convertFrom(entity), entity.getKeyTs(), + convertFrom(entity.getEvents()), + convertEntityFrom(entity.getRelatedEntities()), + entity.getPrimaryFilters(), + entity.getOtherInfo()); + requests.add(request); + } + store.put(requests); + // TODO: return error codes + return Response.status(Response.Status.OK).build(); + } + + private void init(HttpServletResponse response) { + response.setContentType(null); + } + + private static ATSEntity convertFrom(EntityInfo entity) { + ATSEntity atsEntity = new ATSEntity(); + atsEntity.setEntityId(entity.getEntity().getId()); + atsEntity.setEntityType(entity.getEntity().getType()); + atsEntity.setKeyTs(entity.getKeyTs()); + List events = entity.getEvents(); + if (events != null) { + for (Event e : events) { + ATSEvent atsEvent = new ATSEvent(); + atsEvent.setTimestamp(e.getTs()); + atsEvent.setEventType(e.getType()); + atsEvent.setEventInfo(e.getInfo()); + atsEntity.addEvent(atsEvent); + } + } + Map> relatedEntities = entity.getRelatedEntities(); + if (relatedEntities != null) + atsEntity.addRelatedEntities(convertFrom(relatedEntities)); + Map primaryFilters = entity.getPrimaryFilters(); + if (primaryFilters != null) + atsEntity.addPrimaryFilters(primaryFilters); + Map otherInfo = entity.getOtherInfo(); + if (otherInfo != null) + atsEntity.addOtherInfo(otherInfo); + return atsEntity; + } + + private static Entity convertFrom(ATSEntity atsEntity) { + return new Entity(atsEntity.getEntityId(), atsEntity.getEntityType()); + } + + private static List convertFrom(List atsEvents) { + List events = new ArrayList(); + for (ATSEvent atsEvent : atsEvents) { + Event event = new Event(atsEvent.getTimestamp(), atsEvent.getEventType(), + atsEvent.getEventInfo()); + events.add(event); + } + return events; + } + + private static List convertEntityFrom( + Map> atsEntities) { + List entities = new ArrayList(); + for (Map.Entry> atsEntity : atsEntities.entrySet()) { + for (Object value : atsEntity.getValue()) { + Entity entity = new Entity((String) value, atsEntity.getKey()); + entities.add(entity); + } + } + return entities; + } + + private static Map> convertFrom( + Map> relatedEntities) { + Map> atsRelatedEntities = + new HashMap>(); + for (Map.Entry> relatedEntity : relatedEntities + .entrySet()) { + List atsRelatedEntity = new ArrayList(); + for (String entity : relatedEntity.getValue()) { + atsRelatedEntity.add(entity); + } + atsRelatedEntities.put(relatedEntity.getKey(), atsRelatedEntity); + } + return atsRelatedEntities; + } + + private static SortedSet parseArrayStr(String str, String delimiter) { + if (str == null) { + return null; + } + SortedSet strSet = new TreeSet(); + String[] strs = str.split(delimiter); + for (String aStr : strs) { + strSet.add(aStr.trim()); + } + return strSet; + } + + private static NameValuePair parsePairStr(String str, String delimiter) { + if (str == null) { + return null; + } + String[] strs = str.split(delimiter, 2); + return new NameValuePair(strs[0].trim(), strs[1].trim()); + } + + private static Collection parsePairsStr( + String str, String aDelimiter, String pDelimiter) { + if (str == null) { + return null; + } + String[] strs = str.split(aDelimiter); + Set pairs = new HashSet(); + for (String aStr : strs) { + pairs.add(parsePairStr(aStr, pDelimiter)); + } + return pairs; + } + + private static EnumSet parseFieldsStr(String str, String delimiter) { + if (str == null) { + return null; + } + String[] strs = str.split(delimiter); + List fieldList = new ArrayList(); + for (String s : strs) { + fieldList.add(Field.valueOf(s.toUpperCase())); + } + if (fieldList.size() == 0) + return null; + Field f1 = fieldList.remove(fieldList.size()-1); + if (fieldList.size()==0) + return EnumSet.of(f1); + else + return EnumSet.of(f1, fieldList.toArray(new Field[fieldList.size()])); + } + + private static Long parseLongStr(String str) { + return str == null ? null : Long.parseLong(str.trim()); + } + + private static String parseStr(String str) { + return str == null ? null : str.trim(); + } + +}