diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowEntity.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowEntity.java index 0765f00..24d125a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowEntity.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowEntity.java @@ -31,6 +31,7 @@ @InterfaceStability.Unstable public class FlowEntity extends HierarchicalTimelineEntity { private String user; + private String name; private String version; private String run; @@ -38,18 +39,23 @@ public FlowEntity() { super(TimelineEntityType.YARN_FLOW.toString()); } + @XmlElement(name = "id") @Override public String getId() { - //Flow id schema: user@flow_name(or id)/version/run - StringBuilder sb = new StringBuilder(); - sb.append(user); - sb.append('@'); - sb.append(super.getId()); - sb.append('/'); - sb.append(version); - sb.append('/'); - sb.append(run); - return sb.toString(); + if (super.getId() != null) { + return super.getId(); + } else { + //Flow id schema: user@flow_name(or id)/version/run + StringBuilder sb = new StringBuilder(); + sb.append(user); + sb.append('@'); + sb.append(name); + sb.append('/'); + sb.append(version); + sb.append('/'); + sb.append(run); + return sb.toString(); + } } @XmlElement(name = "user") @@ -61,6 +67,15 @@ public void setUser(String user) { this.user = user; } + @XmlElement(name = "name") + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + @XmlElement(name = "version") public String getVersion() { return version; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/HierarchicalTimelineEntity.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/HierarchicalTimelineEntity.java index 01d85cf..8ffc159 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/HierarchicalTimelineEntity.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/HierarchicalTimelineEntity.java @@ -35,6 +35,10 @@ private Identifier parent; private HashMap> children = new HashMap<>(); + HierarchicalTimelineEntity() { + super(); + } + HierarchicalTimelineEntity(String type) { super(type); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index f70cf48..29a9747 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -352,12 +353,6 @@ public void putEntitiesAsync( private void putEntities(boolean async, org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) throws IOException, YarnException { - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities - entitiesContainer = - new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities(); - for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity : entities) { - entitiesContainer.addEntity(entity); - } MultivaluedMap params = new MultivaluedMapImpl(); if (contextAppId != null) { params.add("appid", contextAppId.toString()); @@ -365,7 +360,15 @@ private void putEntities(boolean async, if (async) { params.add("async", Boolean.TRUE.toString()); } - putObjects("entities", params, entitiesContainer); + for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity : entities) { + String path = "entities"; + try { + path += "/" + TimelineEntityType.valueOf(entity.getType()).toString(); + } catch (IllegalArgumentException e) { + // Do nothing, generic entity type + } + putObjects(path, params, entity); + } } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java index 4f8ab94..c50ceaa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java @@ -98,13 +98,13 @@ public void testFirstClassCitizenEntities() throws Exception { cluster.setId("test cluster id"); FlowEntity flow1 = new FlowEntity(); - flow1.setId("test flow id"); + flow1.setName("test flow id"); flow1.setUser(user.getId()); flow1.setVersion("test flow version"); flow1.setRun("test run 1"); FlowEntity flow2 = new FlowEntity(); - flow2.setId("test flow run id2"); + flow2.setName("test flow run id2"); flow2.setUser(user.getId()); flow1.setVersion("test flow version2"); flow2.setRun("test run 2"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java index c8b9625..50b7a3e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java @@ -24,9 +24,13 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import org.apache.commons.math3.stat.clustering.Cluster; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.timelineservice.*; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -35,6 +39,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -85,6 +90,52 @@ public void testPutEntities() throws Exception { } } + @Test + public void testPutExtendedEntities() throws Exception { + ApplicationId appId = ApplicationId.newInstance(0, 1); + TimelineClient client = + TimelineClient.createTimelineClient(appId); + try { + // set the timeline service address manually + client.setTimelineServiceAddress( + collectorManager.getRestServerBindAddress()); + client.init(new YarnConfiguration()); + client.start(); + ClusterEntity cluster = new ClusterEntity(); + cluster.setId(YarnConfiguration.DEFAULT_RM_CLUSTER_ID); + client.putEntities(cluster); + client.putEntitiesAsync(cluster); + FlowEntity flow = new FlowEntity(); + flow.setId(TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId)); + client.putEntities(flow); + client.putEntitiesAsync(flow); + ApplicationEntity app = new ApplicationEntity(); + app.setId(appId.toString()); + client.putEntities(app); + client.putEntitiesAsync(app); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); + ApplicationAttemptEntity appAttempt = new ApplicationAttemptEntity(); + appAttempt.setId(attemptId.toString()); + client.putEntities(appAttempt); + client.putEntitiesAsync(appAttempt); + ContainerId containerId = ContainerId.newContainerId(attemptId, 1); + ContainerEntity container = new ContainerEntity(); + container.setId(containerId.toString()); + client.putEntities(container); + client.putEntitiesAsync(container); + TimelineUser user = new TimelineUser(); + user.setId(UserGroupInformation.getCurrentUser().getShortUserName()); + client.putEntities(user); + client.putEntitiesAsync(user); + TimelineQueue queue = new TimelineQueue(); + queue.setId("default_queue"); + client.putEntities(queue); + client.putEntitiesAsync(queue); + } finally { + client.stop(); + } + } + private static class MyTimelineCollectorManager extends TimelineCollectorManager { public MyTimelineCollectorManager() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index 677feb1..7ab14e5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -28,6 +28,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -84,20 +85,22 @@ public TimelineWriter getWriter() { * * This method should be reserved for selected critical entities and events. * For normal voluminous writes one should use the async method - * {@link #putEntitiesAsync(TimelineEntities, UserGroupInformation)}. + * {@link #putEntityAsync(TimelineEntity, UserGroupInformation)}. * - * @param entities entities to post + * @param entity entity to post * @param callerUgi the caller UGI * @return the response that contains the result of the post. */ - public TimelineWriteResponse putEntities(TimelineEntities entities, + public TimelineWriteResponse putEntity(TimelineEntity entity, UserGroupInformation callerUgi) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("SUCCESS - TIMELINE V2 PROTOTYPE"); - LOG.debug("putEntities(entities=" + entities + ", callerUgi=" + LOG.debug("putEntity(entities=" + entity + ", callerUgi=" + callerUgi + ")"); } + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(entity); TimelineCollectorContext context = getTimelineEntityContext(); return writer.write(context.getClusterId(), context.getUserId(), context.getFlowId(), context.getFlowRunId(), context.getAppId(), @@ -111,15 +114,14 @@ public TimelineWriteResponse putEntities(TimelineEntities entities, * backing storage. Multiple writes to the same entities may be batched and * appropriate values updated and result in fewer writes to the backing * storage. - * - * @param entities entities to post + * @param entity entity to post * @param callerUgi the caller UGI */ - public void putEntitiesAsync(TimelineEntities entities, + public void putEntityAsync(TimelineEntity entity, UserGroupInformation callerUgi) { // TODO implement if (LOG.isDebugEnabled()) { - LOG.debug("putEntitiesAsync(entities=" + entities + ", callerUgi=" + + LOG.debug("putEntityAsync(entities=" + entity + ", callerUgi=" + callerUgi + ")"); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java index 0f51656..d3281d3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java @@ -42,7 +42,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.*; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.webapp.ForbiddenException; import org.apache.hadoop.yarn.webapp.NotFoundException; @@ -110,13 +110,106 @@ public AboutInfo about( */ @PUT @Path("/entities") - @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) - public Response putEntities( + @Consumes(MediaType.APPLICATION_JSON) + public Response putEntity( @Context HttpServletRequest req, @Context HttpServletResponse res, @QueryParam("async") String async, @QueryParam("appid") String appId, - TimelineEntities entities) { + TimelineEntity entity) { + return putEntityInternal(req, res, async, appId, entity); + } + + @PUT + @Path("/entities/YARN_CLUSTER") + @Consumes(MediaType.APPLICATION_JSON) + public Response putContainerEntity( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @QueryParam("async") String async, + @QueryParam("appid") String appId, + ClusterEntity entity) { + return putEntityInternal(req, res, async, appId, entity); + } + + @PUT + @Path("/entities/YARN_FLOW") + @Consumes(MediaType.APPLICATION_JSON) + public Response putContainerEntity( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @QueryParam("async") String async, + @QueryParam("appid") String appId, + FlowEntity entity) { + return putEntityInternal(req, res, async, appId, entity); + } + + @PUT + @Path("/entities/YARN_APPLICATION") + @Consumes(MediaType.APPLICATION_JSON) + public Response putContainerEntity( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @QueryParam("async") String async, + @QueryParam("appid") String appId, + ApplicationEntity entity) { + return putEntityInternal(req, res, async, appId, entity); + } + + @PUT + @Path("/entities/YARN_APPLICATION_ATTEMPT") + @Consumes(MediaType.APPLICATION_JSON) + public Response putContainerEntity( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @QueryParam("async") String async, + @QueryParam("appid") String appId, + ApplicationAttemptEntity entity) { + return putEntityInternal(req, res, async, appId, entity); + } + + @PUT + @Path("/entities/YARN_CONTAINER") + @Consumes(MediaType.APPLICATION_JSON) + public Response putContainerEntity( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @QueryParam("async") String async, + @QueryParam("appid") String appId, + ContainerEntity entity) { + return putEntityInternal(req, res, async, appId, entity); + } + + @PUT + @Path("/entities/YARN_USER") + @Consumes(MediaType.APPLICATION_JSON) + public Response putContainerEntity( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @QueryParam("async") String async, + @QueryParam("appid") String appId, + TimelineUser entity) { + return putEntityInternal(req, res, async, appId, entity); + } + + @PUT + @Path("/entities/YARN_QUEUE") + @Consumes(MediaType.APPLICATION_JSON) + public Response putContainerEntity( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @QueryParam("async") String async, + @QueryParam("appid") String appId, + TimelineQueue entity) { + return putEntityInternal(req, res, async, appId, entity); + } + + private Response putEntityInternal( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @QueryParam("async") String async, + @QueryParam("appid") String appId, + TimelineEntity entity) { init(res); UserGroupInformation callerUgi = getUser(req); if (callerUgi == null) { @@ -138,7 +231,7 @@ public Response putEntities( LOG.error("Application not found"); throw new NotFoundException(); // different exception? } - collector.putEntities(entities, callerUgi); + collector.putEntity(entity, callerUgi); return Response.ok().build(); } catch (Exception e) { LOG.error("Error putting entities", e);