diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index cecc1ce..5aad4fa 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1005,6 +1005,9 @@
public static final String ATS_PREFIX = YARN_PREFIX + "ats.";
+ /** ATS store class */
+ public static final String ATS_STORE = ATS_PREFIX + "store.class";
+
// Redirect ATS web configurations to AHS as ATS REST APIs are hosted by
// AHS web server
/** The address of the ATS web application.*/
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index c493742..642be72 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -588,6 +588,14 @@
org.apache.hadoop.yarn.server.applicationhistoryservice.NullApplicationHistoryStore
+
+
+
+ Store class name for application timeline store
+ yarn.ats.store.class
+ org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.ApplicationTimelineStore
+
+
The hostname of the NM.
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
index 3a864c8..f053c5b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
@@ -27,11 +27,13 @@
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.ApplicationTimelineStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
@@ -51,6 +53,7 @@
ApplicationHistoryClientService ahsClientService;
ApplicationHistoryManager historyManager;
+ ApplicationTimelineStore timelineStore;
private WebApp webApp;
public ApplicationHistoryServer() {
@@ -63,6 +66,8 @@ protected void serviceInit(Configuration conf) throws Exception {
ahsClientService = createApplicationHistoryClientService(historyManager);
addService(ahsClientService);
addService((Service) historyManager);
+ timelineStore = createApplicationTimeStore(conf);
+ addIfService(timelineStore);
super.serviceInit(conf);
}
@@ -135,6 +140,15 @@ protected ApplicationHistoryManager createApplicationHistoryManager(
return new ApplicationHistoryManagerImpl();
}
+ protected ApplicationTimelineStore createApplicationTimeStore(
+ Configuration conf) {
+ // TODO: need to replace the first ApplicationTimelineStore.class with the
+ // default class
+ return ReflectionUtils.newInstance(conf.getClass(
+ YarnConfiguration.ATS_STORE, ApplicationTimelineStore.class,
+ ApplicationTimelineStore.class), conf);
+ }
+
protected void startWebApp() {
String bindAddress = WebAppUtils.getAHSWebAppURLWithoutScheme(getConfig());
LOG.info("Instantiating AHSWebApp at " + bindAddress);
@@ -148,7 +162,8 @@ protected void startWebApp() {
YarnConfiguration.AHS_WEBAPP_SPNEGO_USER_NAME_KEY)
.withHttpSpnegoKeytabKey(
YarnConfiguration.AHS_WEBAPP_SPNEGO_KEYTAB_FILE_KEY)
- .at(bindAddress).start(new AHSWebApp(historyManager));
+ .at(bindAddress)
+ .start(new AHSWebApp(historyManager, timelineStore));
} catch (Exception e) {
String msg = "AHSWebApp failed to start.";
LOG.error(msg, e);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java
index 81f8383..d2cfc32 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java
@@ -21,24 +21,31 @@
import org.apache.hadoop.yarn.server.api.ApplicationContext;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManager;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.ApplicationTimelineStore;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.WebApp;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.apache.hadoop.yarn.webapp.YarnWebParams;
public class AHSWebApp extends WebApp implements YarnWebParams {
private final ApplicationHistoryManager applicationHistoryManager;
+ private final ApplicationTimelineStore applicationTimelineStore;
- public AHSWebApp(ApplicationHistoryManager applicationHistoryManager) {
+ public AHSWebApp(ApplicationHistoryManager applicationHistoryManager,
+ ApplicationTimelineStore applicationTimelineStore) {
this.applicationHistoryManager = applicationHistoryManager;
+ this.applicationTimelineStore = applicationTimelineStore;
}
@Override
public void setup() {
- bind(JAXBContextResolver.class);
+ bind(YarnJacksonJaxbJsonProvider.class);
bind(AHSWebServices.class);
+ bind(ATSWebServices.class);
bind(GenericExceptionHandler.class);
bind(ApplicationContext.class).toInstance(applicationHistoryManager);
+ bind(ApplicationTimelineStore.class).toInstance(applicationTimelineStore);
route("/", AHSController.class);
route(pajoin("/apps", APP_STATE), AHSController.class);
route(pajoin("/app", APPLICATION_ID), AHSController.class, "app");
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/ATSWebServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/ATSWebServices.java
new file mode 100644
index 0000000..e589ef0
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/ATSWebServices.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
+import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity;
+import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvent;
+import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvents;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.ApplicationTimelineReader.Field;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.ApplicationTimelineStore;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.Entity;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.EntityInfo;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.Event;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.NameValuePair;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.TimelinePutRequest;
+import org.apache.hadoop.yarn.webapp.BadRequestException;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+@Path("/ws/v1/apptimeline")
+public class ATSWebServices {
+
+ private ApplicationTimelineStore store;
+
+ @Inject
+ public ATSWebServices(ApplicationTimelineStore store) {
+ this.store = store;
+ }
+
+ @XmlRootElement(name = "about")
+ @XmlAccessorType(XmlAccessType.NONE)
+ @Public
+ @Unstable
+ public static class AboutInfo {
+
+ private String about;
+
+ public AboutInfo() {
+
+ }
+
+ public AboutInfo(String about) {
+ this.about = about;
+ }
+
+ @XmlElement(name = "About")
+ public String getAbout() {
+ return about;
+ }
+
+ public void setAbout(String about) {
+ this.about = about;
+ }
+
+ }
+
+ @GET
+ @Path("/")
+ @Produces({ MediaType.APPLICATION_JSON/*, MediaType.APPLICATION_XML*/ })
+ public AboutInfo about(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res) {
+ init(res);
+ return new AboutInfo("Application Timeline API");
+ }
+
+ @GET
+ @Path("/{entityType}")
+ @Produces({ MediaType.APPLICATION_JSON/*, MediaType.APPLICATION_XML*/ })
+ public ATSEntities getEntities(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ @PathParam("entityType") String entityType,
+ @QueryParam("primaryFilter") String primaryFilter,
+ @QueryParam("secondaryFilter") String secondaryFilter,
+ @QueryParam("windowStart") String windowStart,
+ @QueryParam("windowEnd") String windowEnd,
+ @QueryParam("limit") String limit,
+ @QueryParam("fields") String fields) {
+ init(res);
+ List timeline = null;
+ try {
+ timeline = store.getEntities(
+ parseStr(entityType),
+ parseLongStr(limit),
+ parseLongStr(windowStart),
+ parseLongStr(windowEnd),
+ parsePairStr(primaryFilter, ":"),
+ parsePairsStr(secondaryFilter, ",", ":"),
+ parseFieldsStr(fields, ","));
+ } catch (NumberFormatException e) {
+ throw new BadRequestException(
+ "windowStart, windowEnd or limit is not a numeric value.");
+ } catch (IllegalArgumentException e) {
+ throw new BadRequestException(
+ "requested invalid field.");
+ }
+ if (timeline == null || timeline.isEmpty()) {
+ return new ATSEntities();
+ }
+ ATSEntities atsEntities = new ATSEntities();
+ for (EntityInfo entity : timeline) {
+ atsEntities.addEntity(convertFrom(entity));
+ }
+ return atsEntities;
+ }
+
+ @GET
+ @Path("/{entityType}/{entityId}")
+ @Produces({ MediaType.APPLICATION_JSON/*, MediaType.APPLICATION_XML*/ })
+ public ATSEntity getEntity(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ @PathParam("entityType") String entityType,
+ @PathParam("entityId") String entityId,
+ @QueryParam("fields") String fields) {
+ init(res);
+ Entity entity = new Entity(parseStr(entityId), parseStr(entityType));
+ EntityInfo entityInfo = null;
+ try {
+ entityInfo = store.getEntityInfo(entity, parseFieldsStr(fields, ","));
+ } catch (IllegalArgumentException e) {
+ throw new BadRequestException(
+ "requested invalid field.");
+ }
+ if (entityInfo == null) {
+ throw new WebApplicationException(Response.Status.NOT_FOUND);
+ }
+ return convertFrom(entityInfo);
+ }
+
+ @GET
+ @Path("/{entityType}/events")
+ @Produces({ MediaType.APPLICATION_JSON/*, MediaType.APPLICATION_XML*/ })
+ public ATSEvents getEvents(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ @PathParam("entityType") String entityType,
+ @QueryParam("entityId") String entityId,
+ @QueryParam("eventType") String eventType,
+ @QueryParam("windowStart") String windowStart,
+ @QueryParam("windowEnd") String windowEnd,
+ @QueryParam("limit") String limit) {
+ init(res);
+ Map> timeline = null;
+ try {
+ timeline = store.getEntityTimelines(
+ parseStr(entityType),
+ parseArrayStr(entityId, ","),
+ parseLongStr(limit),
+ parseLongStr(windowStart),
+ parseLongStr(windowEnd),
+ parseArrayStr(eventType, ","));
+ } catch (NumberFormatException e) {
+ throw new BadRequestException(
+ "windowStart, windowEnd or limit is not a numeric value.");
+ }
+ if (timeline == null || timeline.isEmpty()) {
+ return new ATSEvents();
+ }
+ ATSEvents atsTimeline = new ATSEvents();
+ for (Map.Entry> events : timeline.entrySet()) {
+ ATSEvents.ATSEventsOfOneEntity atsEvents =
+ new ATSEvents.ATSEventsOfOneEntity();
+ atsEvents.setEntityId(events.getKey().getId());
+ atsEvents.setEntityType(events.getKey().getType());
+ for (Event event : events.getValue()) {
+ ATSEvent atsEvent = new ATSEvent();
+ atsEvent.setTimestamp(event.getTs());
+ atsEvent.setEventType(event.getType());
+ atsEvent.addEventInfo(event.getInfo());
+ atsEvents.addEntity(atsEvent);
+ }
+ atsTimeline.addEvent(atsEvents);
+ }
+ return atsTimeline;
+ }
+
+ @POST
+ @Path("/")
+ @Consumes({ MediaType.APPLICATION_JSON/*, MediaType.APPLICATION_XML*/ })
+ public Response postEntities(
+ ATSEntities entities) {
+ List requests = new ArrayList();
+ for (ATSEntity entity : entities.getEntities()) {
+ TimelinePutRequest request = TimelinePutRequest.newInstance(
+ convertFrom(entity), entity.getKeyTs(),
+ convertFrom(entity.getEvents()),
+ convertEntityFrom(entity.getRelatedEntities()),
+ entity.getPrimaryFilters(),
+ entity.getOtherInfo());
+ requests.add(request);
+ }
+ store.put(requests);
+ // TODO: return error codes
+ return Response.status(Response.Status.OK).build();
+ }
+
+ private void init(HttpServletResponse response) {
+ response.setContentType(null);
+ }
+
+ private static ATSEntity convertFrom(EntityInfo entity) {
+ ATSEntity atsEntity = new ATSEntity();
+ atsEntity.setEntityId(entity.getEntity().getId());
+ atsEntity.setEntityType(entity.getEntity().getType());
+ atsEntity.setKeyTs(entity.getKeyTs());
+ List events = entity.getEvents();
+ if (events != null) {
+ for (Event e : events) {
+ ATSEvent atsEvent = new ATSEvent();
+ atsEvent.setTimestamp(e.getTs());
+ atsEvent.setEventType(e.getType());
+ atsEvent.setEventInfo(e.getInfo());
+ atsEntity.addEvent(atsEvent);
+ }
+ }
+ Map> relatedEntities = entity.getRelatedEntities();
+ if (relatedEntities != null)
+ atsEntity.addRelatedEntities(convertFrom(relatedEntities));
+ Map primaryFilters = entity.getPrimaryFilters();
+ if (primaryFilters != null)
+ atsEntity.addPrimaryFilters(primaryFilters);
+ Map otherInfo = entity.getOtherInfo();
+ if (otherInfo != null)
+ atsEntity.addOtherInfo(otherInfo);
+ return atsEntity;
+ }
+
+ private static Entity convertFrom(ATSEntity atsEntity) {
+ return new Entity(atsEntity.getEntityId(), atsEntity.getEntityType());
+ }
+
+ private static List convertFrom(List atsEvents) {
+ List events = new ArrayList();
+ for (ATSEvent atsEvent : atsEvents) {
+ Event event = new Event(atsEvent.getTimestamp(), atsEvent.getEventType(),
+ atsEvent.getEventInfo());
+ events.add(event);
+ }
+ return events;
+ }
+
+ private static List convertEntityFrom(
+ Map> atsEntities) {
+ List entities = new ArrayList();
+ for (Map.Entry> atsEntity : atsEntities.entrySet()) {
+ for (Object value : atsEntity.getValue()) {
+ Entity entity = new Entity((String) value, atsEntity.getKey());
+ entities.add(entity);
+ }
+ }
+ return entities;
+ }
+
+ private static Map> convertFrom(
+ Map> relatedEntities) {
+ Map> atsRelatedEntities =
+ new HashMap>();
+ for (Map.Entry> relatedEntity : relatedEntities
+ .entrySet()) {
+ List