diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index 2fc3033..1ba9911 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -133,19 +133,35 @@ protected boolean isReadyToAggregate() { public TimelineWriteResponse putEntities(TimelineEntities entities, UserGroupInformation callerUgi) throws IOException { if (LOG.isDebugEnabled()) { - LOG.debug("SUCCESS - TIMELINE V2 PROTOTYPE"); LOG.debug("putEntities(entities=" + entities + ", callerUgi=" + callerUgi + ")"); } - TimelineCollectorContext context = getTimelineEntityContext(); + TimelineWriteResponse response = writeTimelineEntitiesAsync(entities); + persistOutstandingTimelineEntities(); + + return response; + } + + private TimelineWriteResponse writeTimelineEntitiesAsync( + TimelineEntities entities) throws IOException { // Update application metrics for aggregation updateAggregateStatus(entities, aggregationGroups, getEntityTypesSkipAggregation()); + final TimelineCollectorContext context = getTimelineEntityContext(); return writer.write(context.getClusterId(), context.getUserId(), - context.getFlowName(), context.getFlowVersion(), context.getFlowRunId(), - context.getAppId(), entities); + context.getFlowName(), context.getFlowVersion(), + context.getFlowRunId(), context.getAppId(), entities); + } + + /** + * Flush the TimelineWriter to persist any TimelineEntities in the buffer. + * @throws IOException if there is any exception encountered while putting + * buffered entities. + */ + private void persistOutstandingTimelineEntities() throws IOException { + writer.flush(); } /** @@ -158,14 +174,17 @@ public TimelineWriteResponse putEntities(TimelineEntities entities, * * @param entities entities to post * @param callerUgi the caller UGI + * @throws IOException if there is any exception encounted while putting + * entities. */ public void putEntitiesAsync(TimelineEntities entities, - UserGroupInformation callerUgi) { - // TODO implement + UserGroupInformation callerUgi) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("putEntitiesAsync(entities=" + entities + ", callerUgi=" + callerUgi + ")"); } + + writeTimelineEntitiesAsync(entities); } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java index f36c636..29be878 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java @@ -152,9 +152,6 @@ public Response putEntities( throw new ForbiddenException(msg); } - // TODO how to express async posts and handle them - boolean isAsync = async != null && async.trim().equalsIgnoreCase("true"); - try { ApplicationId appID = parseApplicationId(appId); if (appID == null) { @@ -169,7 +166,14 @@ public Response putEntities( throw new NotFoundException(); // different exception? } - collector.putEntities(processTimelineEntities(entities), callerUgi); + boolean isAsync = async != null && async.trim().equalsIgnoreCase("true"); + if (isAsync) { + collector.putEntities(processTimelineEntities(entities), callerUgi); + } else { + collector.putEntitiesAsync( + processTimelineEntities(entities), callerUgi); + } + return Response.ok().build(); } catch (Exception e) { LOG.error("Error putting entities", e); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java index 5b4dc50..a55f227 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java @@ -18,17 +18,27 @@ package org.apache.hadoop.yarn.server.timelineservice.collector; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import org.junit.Test; +import java.io.IOException; import java.util.HashSet; import java.util.Set; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; public class TestTimelineCollector { @@ -124,4 +134,57 @@ public void testAggregation() throws Exception { } } + + /** + * Test TimelineCollector's interaction with TimelineWriter upon + * putEntity() calls. + */ + @Test + public void testPutEntity() throws IOException { + TimelineWriter writer = mock(TimelineWriter.class); + TimelineCollector collector = new TimelineCollectorForTest(writer); + + TimelineEntities entities = generateTestEntities(1, 1); + collector.putEntities( + entities, UserGroupInformation.createRemoteUser("test-user")); + + verify(writer, times(1)).write( + anyString(), anyString(), anyString(), anyString(), anyLong(), + anyString(), any(TimelineEntities.class)); + verify(writer, times(1)).flush(); + } + + /** + * Test TimelineCollector's interaction with TimelineWriter upon + * putEntityAsync() calls. + */ + @Test + public void testPutEntityAsync() throws IOException { + TimelineWriter writer = mock(TimelineWriter.class); + TimelineCollector collector = new TimelineCollectorForTest(writer); + + TimelineEntities entities = generateTestEntities(1, 1); + collector.putEntitiesAsync( + entities, UserGroupInformation.createRemoteUser("test-user")); + + verify(writer, times(1)).write( + anyString(), anyString(), anyString(), anyString(), anyLong(), + anyString(), any(TimelineEntities.class)); + verify(writer, never()).flush(); + } + + private static class TimelineCollectorForTest extends TimelineCollector { + private final TimelineCollectorContext context = + new TimelineCollectorContext(); + + TimelineCollectorForTest(TimelineWriter writer) { + super("TimelineCollectorForTest"); + setWriter(writer); + } + + @Override + public TimelineCollectorContext getTimelineEntityContext() { + return context; + } + } }