diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 3a19ac2..2774860 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -541,7 +541,7 @@ public boolean init(String[] args) throws ParseException, IOException { // Creating the Timeline Client if (newTimelineService) { timelineClient = TimelineClient.createTimelineClient( - appAttemptID.getApplicationId()); + null, null, appAttemptID.getApplicationId()); } else { timelineClient = TimelineClient.createTimelineClient(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 97d9168..3b0abf2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -52,6 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeTimelineAggregatorsAuxService; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -320,15 +322,21 @@ private void checkTimelineV1(boolean haveDomain) throws Exception { } } - private void checkTimelineV2(boolean haveDomain, ApplicationId appId) { - // For PoC check in /tmp/ YARN-3264 - String tmpRoot = FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT; + private void checkTimelineV2(boolean haveDomain, ApplicationId appId) + throws IOException { + // For PoC check in /tmp/timeline_service_data YARN-3264 + String tmpRoot = FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT + + "entities/"; File tmpRootFolder = new File(tmpRoot); Assert.assertTrue(tmpRootFolder.isDirectory()); // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs - String outputDirApp = tmpRoot + "/DS_APP_ATTEMPT/"; + String outputDirApp = tmpRoot + + TimelineUtils.generateDefaultClusterIdBasedOnAppId(appId) + "/" + + UserGroupInformation.getCurrentUser().getShortUserName() + "/" + + TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId) + "/0/" + + appId.toString() + "/DS_APP_ATTEMPT/"; File entityFolder = new File(outputDirApp); Assert.assertTrue(entityFolder.isDirectory()); @@ -341,7 +349,11 @@ private void checkTimelineV2(boolean haveDomain, ApplicationId appId) { File appAttemptFile = new File(appAttemptFileName); Assert.assertTrue(appAttemptFile.exists()); - String outputDirContainer = tmpRoot + "/DS_CONTAINER/"; + String outputDirContainer = tmpRoot + + TimelineUtils.generateDefaultClusterIdBasedOnAppId(appId) + "/" + + UserGroupInformation.getCurrentUser().getShortUserName() + "/" + + TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId) + "/0/" + + appId.toString() + "/DS_CONTAINER/"; File containerFolder = new File(outputDirContainer); Assert.assertTrue(containerFolder.isDirectory()); @@ -355,7 +367,7 @@ private void checkTimelineV2(boolean haveDomain, ApplicationId appId) { + "_"; deleteAppFiles(new File(outputDirApp), appTimeStamp); deleteAppFiles(new File(outputDirContainer), appTimeStamp); - tmpRootFolder.delete(); + new File(tmpRootFolder.getParent()).delete(); } private void deleteAppFiles(File rootDir, String appTimeStamp) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java index 5db347e..9e1ce98 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java @@ -20,9 +20,11 @@ import java.io.IOException; +import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -30,8 +32,10 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; /** * A client library that can be used to post some information in terms of a @@ -41,6 +45,9 @@ @Unstable public abstract class TimelineClient extends AbstractService { + protected String contextClusterId; + protected String contextFlowId; + protected String contextFlowRunId; protected ApplicationId contextAppId; @Public @@ -50,14 +57,25 @@ public static TimelineClient createTimelineClient() { } @Public - public static TimelineClient createTimelineClient(ApplicationId appId) { - TimelineClient client = new TimelineClientImpl(appId); + public static TimelineClient createTimelineClient( + String flowId, String flowRunId, ApplicationId appId) { + TimelineClient client = + new TimelineClientImpl(flowId, flowRunId, appId); return client; } @Private - protected TimelineClient(String name, ApplicationId appId) { + protected TimelineClient(String name) { super(name); + } + + @Private + protected TimelineClient( + String name, String flowId,String flowRunId, ApplicationId appId) { + super(name); + Preconditions.checkNotNull(appId, "Context app ID is not set"); + contextFlowId = flowId; + contextFlowRunId = flowRunId; contextAppId = appId; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index 407682d..541ea2a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import org.codehaus.jackson.map.ObjectMapper; @@ -264,15 +265,28 @@ public boolean shouldRetryOn(Exception e) { } public TimelineClientImpl() { - super(TimelineClientImpl.class.getName(), null); + super(TimelineClientImpl.class.getName()); } - public TimelineClientImpl(ApplicationId applicationId) { - super(TimelineClientImpl.class.getName(), applicationId); + public TimelineClientImpl( + String flowId, String flowRunId, ApplicationId applicationId) { + super(TimelineClientImpl.class.getName(), flowId, flowRunId, applicationId); this.newTimelineService = true; } protected void serviceInit(Configuration conf) throws Exception { + if (newTimelineService) { + contextClusterId = conf.get(YarnConfiguration.RM_CLUSTER_ID, + TimelineUtils.generateDefaultClusterIdBasedOnAppId(contextAppId)); + if (contextFlowId == null || contextFlowId.isEmpty()) { + contextFlowId = + TimelineUtils.generateDefaultFlowIdBasedOnAppId(contextAppId); + } + if (contextFlowRunId == null || contextFlowRunId.isEmpty()) { + contextFlowRunId = "0"; + } + } + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); UserGroupInformation realUgi = ugi.getRealUser(); if (realUgi != null) { @@ -358,40 +372,50 @@ private void putEntities(boolean async, for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity : entities) { entitiesContainer.addEntity(entity); } - MultivaluedMap params = new MultivaluedMapImpl(); - if (contextAppId != null) { - params.add("appid", contextAppId.toString()); - } - if (async) { - params.add("async", Boolean.TRUE.toString()); - } - putObjects("entities", params, entitiesContainer); - } - @Override - public void putDomain(TimelineDomain domain) throws IOException, - YarnException { - doPosting(domain, "domain"); - } - - // Used for new timeline service only - @Private - public void putObjects(String path, MultivaluedMap params, - Object obj) throws IOException, YarnException { - // timelineServiceAddress could haven't be initialized yet // or stale (only for new timeline service) int retries = pollTimelineServiceAddress(this.maxServiceRetries); - + // timelineServiceAddress could be stale, add retry logic here. boolean needRetry = true; while (needRetry) { try { - URI uri = constructResURI(getConfig(), timelineServiceAddress, true); - putObjects(uri, path, params, obj); + ClientResponse resp; + try { + resp = client + .resource(constructResURI(getConfig(), timelineServiceAddress, true)) + .path("entities") + .path(contextClusterId) + .path(authUgi.getShortUserName()) + .path(contextFlowId) + .path(contextFlowRunId) + .path(contextAppId.toString()) + .queryParam("async", String.valueOf(async)) + .accept(MediaType.APPLICATION_JSON) + .type(MediaType.APPLICATION_JSON) + .put(ClientResponse.class, entitiesContainer); + } catch (RuntimeException re) { + // runtime exception is expected if the client cannot connect the server + String msg = + "Failed to get the response from the timeline server."; + LOG.error(msg, re); + throw new IOException(re); + } + if (resp == null || + resp.getClientResponseStatus() != ClientResponse.Status.OK) { + String msg = + "Failed to get the response from the timeline server."; + LOG.error(msg); + if (LOG.isDebugEnabled() && resp != null) { + String output = resp.getEntity(String.class); + LOG.debug("HTTP error code: " + resp.getStatus() + + " Server response:\n" + output); + } + throw new YarnException(msg); + } needRetry = false; - } - catch (Exception e) { + } catch (Exception e) { // TODO only handle exception for timelineServiceAddress being updated. // skip retry for other exceptions. checkRetryWithSleep(retries, e); @@ -399,7 +423,7 @@ public void putObjects(String path, MultivaluedMap params, } } } - + /** * Check if reaching to maximum of retries. * @param retries @@ -428,34 +452,10 @@ private void checkRetryWithSleep(int retries, Exception e) throws } } - private void putObjects( - URI base, String path, MultivaluedMap params, Object obj) - throws IOException, YarnException { - ClientResponse resp; - try { - resp = client.resource(base).path(path).queryParams(params) - .accept(MediaType.APPLICATION_JSON) - .type(MediaType.APPLICATION_JSON) - .put(ClientResponse.class, obj); - } catch (RuntimeException re) { - // runtime exception is expected if the client cannot connect the server - String msg = - "Failed to get the response from the timeline server."; - LOG.error(msg, re); - throw new IOException(re); - } - if (resp == null || - resp.getClientResponseStatus() != ClientResponse.Status.OK) { - String msg = - "Failed to get the response from the timeline server."; - LOG.error(msg); - if (LOG.isDebugEnabled() && resp != null) { - String output = resp.getEntity(String.class); - LOG.debug("HTTP error code: " + resp.getStatus() - + " Server response:\n" + output); - } - throw new YarnException(msg); - } + @Override + public void putDomain(TimelineDomain domain) throws IOException, + YarnException { + doPosting(domain, "domain"); } private ClientResponse doPosting(final Object obj, final String path) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java index 02b5eb4..903713d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import org.codehaus.jackson.JsonGenerationException; @@ -49,9 +50,8 @@ /** * Serialize a POJO object into a JSON string not in a pretty format - * - * @param o - * an object to serialize + * + * @param o an object to serialize * @return a JSON string * @throws IOException * @throws JsonMappingException @@ -64,11 +64,9 @@ public static String dumpTimelineRecordtoJSON(Object o) /** * Serialize a POJO object into a JSON string - * - * @param o - * an object to serialize - * @param pretty - * whether in a pretty format or not + * + * @param o an object to serialize + * @param pretty whether in a pretty format or not * @return a JSON string * @throws IOException * @throws JsonMappingException @@ -105,4 +103,13 @@ public static Text buildTimelineTokenService(Configuration conf) { getTimelineTokenServiceAddress(conf); return SecurityUtil.buildTokenService(timelineServiceAddr); } + + public static String generateDefaultFlowIdBasedOnAppId(ApplicationId appId) { + return "flow_" + appId.getClusterTimestamp() + "_" + appId.getId(); + } + + public static String generateDefaultClusterIdBasedOnAppId( + ApplicationId appId) { + return "cluster_" + appId.getClusterTimestamp(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java index 32ee5d8..31df1db 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java @@ -1,6 +1,7 @@ package org.apache.hadoop.yarn.server.timelineservice; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -20,6 +21,7 @@ public static void setupClass() throws Exception { try { auxService = PerNodeTimelineAggregatorsAuxService.launchServer(new String[0]); + auxService.aggregatorCollection.isUnitTest = true; auxService.addApplication(ApplicationId.newInstance(0, 1)); } catch (ExitUtil.ExitException e) { fail(); @@ -35,11 +37,15 @@ public static void tearDownClass() throws Exception { @Test public void testPutEntities() throws Exception { - TimelineClient client = - TimelineClient.createTimelineClient(ApplicationId.newInstance(0, 1)); + Configuration conf = new YarnConfiguration(); + TimelineClient client = TimelineClient.createTimelineClient( + null, null, ApplicationId.newInstance(0, 1)); try { - client.init(new YarnConfiguration()); + client.init(conf); client.start(); + client.setTimelineServiceAddress(conf.get( + YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS)); TimelineEntity entity = new TimelineEntity(); entity.setType("test entity type"); entity.setId("test entity id"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java index 19920fd..8000855 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java @@ -53,7 +53,8 @@ LogFactory.getLog(PerNodeTimelineAggregatorsAuxService.class); private static final int SHUTDOWN_HOOK_PRIORITY = 30; - private final TimelineAggregatorsCollection aggregatorCollection; + @VisibleForTesting + public final TimelineAggregatorsCollection aggregatorCollection; public PerNodeTimelineAggregatorsAuxService() { // use the same singleton diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java index dbd0895..03ece4c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -83,21 +84,27 @@ public TimelineWriter getWriter() { * * This method should be reserved for selected critical entities and events. * For normal voluminous writes one should use the async method - * {@link #postEntitiesAsync(TimelineEntities, UserGroupInformation)}. + * {@link #putEntitiesAsync(String, String, String, String, String, TimelineEntities, UserGroupInformation)}. * + * @param clusterId context cluster ID + * @param userId context user ID + * @param flowId context flow ID + * @param flowRunId context flow run ID + * @param appId context app ID * @param entities entities to post * @param callerUgi the caller UGI * @return the response that contains the result of the post. */ - public TimelineWriteResponse postEntities(TimelineEntities entities, + public TimelineWriteResponse putEntities(String clusterId, String userId, + String flowId, String flowRunId, String appId, TimelineEntities entities, UserGroupInformation callerUgi) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("SUCCESS - TIMELINE V2 PROTOTYPE"); - LOG.debug("postEntities(entities=" + entities + ", callerUgi=" + LOG.debug("putEntities(entities=" + entities + ", callerUgi=" + callerUgi + ")"); } - return writer.write(entities); + return writer.write(clusterId, userId, flowId, flowRunId, appId, entities); } /** @@ -108,14 +115,20 @@ public TimelineWriteResponse postEntities(TimelineEntities entities, * appropriate values updated and result in fewer writes to the backing * storage. * + * @param clusterId context cluster ID + * @param userId context user ID + * @param flowId context flow ID + * @param flowRunId context flow run ID + * @param appId context app ID * @param entities entities to post * @param callerUgi the caller UGI */ - public void postEntitiesAsync(TimelineEntities entities, + public void putEntitiesAsync(String clusterId, String userId, + String flowId, String flowRunId, String appId, TimelineEntities entities, UserGroupInformation callerUgi) { // TODO implement if (LOG.isDebugEnabled()) { - LOG.debug("postEntitiesAsync(entities=" + entities + ", callerUgi=" + + LOG.debug("putEntitiesAsync(entities=" + entities + ", callerUgi=" + callerUgi + ")"); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java index 7d42f94..9b23d59 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java @@ -104,13 +104,17 @@ public AboutInfo about( * context. */ @PUT - @Path("/entities") + @Path("/entities/{clusterId}/{userId}/{flowId}/{flowRunId}/{appId}") @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) public Response putEntities( @Context HttpServletRequest req, @Context HttpServletResponse res, + @PathParam("clusterId") String clusterId, + @PathParam("userId") String userId, + @PathParam("flowId") String flowId, + @PathParam("flowRunId") String flowRunId, + @PathParam("appId") String appId, @QueryParam("async") String async, - @QueryParam("appid") String appId, TimelineEntities entities) { init(res); UserGroupInformation callerUgi = getUser(req); @@ -133,7 +137,8 @@ public Response putEntities( LOG.error("Application not found"); throw new NotFoundException(); // different exception? } - service.postEntities(entities, callerUgi); + service.putEntities( + clusterId, userId, flowId, flowRunId, appId, entities, callerUgi); return Response.ok().build(); } catch (Exception e) { LOG.error("Error putting entities", e); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java index d6e2a18..c314803 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.Map; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -74,6 +75,9 @@ private String timelineRestServerBindAddress; private AggregatorNodemanagerProtocol nmAggregatorService; + + @VisibleForTesting + public boolean isUnitTest; private InetSocketAddress nmAggregatorServiceAddress; @@ -146,7 +150,7 @@ public TimelineAggregator putIfAbsent(ApplicationId appId, } // Report to NM if a new aggregator is added. - if (aggregatorIsNew) { + if (aggregatorIsNew && !isUnitTest) { try { reportNewAggregatorToNM(appId); } catch (Exception e) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java index 4a57e97..4b33c29 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java @@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; @@ -45,6 +46,7 @@ implements TimelineWriter { private String outputRoot; + private String entitiesDir; /** Config param for timeline service storage tmp root for FILE YARN-3264 */ public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT @@ -54,6 +56,8 @@ public static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT = "/tmp/timeline_service_data/"; + private static final String ENTITIES_DIR = "entities/"; + /** Default extension for output files */ public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist"; @@ -61,39 +65,44 @@ super((FileSystemTimelineWriterImpl.class.getName())); } - /** - * Stores the entire information in {@link TimelineEntity} to the - * timeline store. Any errors occurring for individual write request objects - * will be reported in the response. - * - * @param data - * a {@link TimelineEntity} object - * @return {@link TimelineWriteResponse} object. - * @throws IOException - */ + @Override - public TimelineWriteResponse write(TimelineEntities entities) - throws IOException { + public TimelineWriteResponse write(String clusterId, String userId, + String flowId, String flowRunId, String appId, TimelineEntities entities) + throws IOException { TimelineWriteResponse response = new TimelineWriteResponse(); for (TimelineEntity entity : entities.getEntities()) { - write(entity, response); + write(clusterId, userId, flowId, flowRunId, appId, entity, response); } return response; } - private void write(TimelineEntity entity, + private void write(String clusterId, String userId, + String flowId, String flowRunId, String appId, TimelineEntity entity, TimelineWriteResponse response) throws IOException { PrintWriter out = null; try { - File outputDir = new File(outputRoot + entity.getType()); - String fileName = outputDir + "/" + entity.getId() - + TIMELINE_SERVICE_STORAGE_EXTENSION; - if (!outputDir.exists()) { - if (!outputDir.mkdirs()) { - throw new IOException("Could not create directories for " + fileName); + File clusterDir = new File(entitiesDir + clusterId + "/"); + if (!clusterDir.exists()) { + if (!clusterDir.mkdirs()) { + throw new IOException("Could not create directories for " + entitiesDir); } } - out = new PrintWriter(new BufferedWriter(new FileWriter(fileName, true))); + String path = entitiesDir + clusterId + "/"; + createDir(path); + path += userId + "/"; + createDir(path); + path += flowId + "/"; + createDir(path); + path += flowRunId + "/"; + createDir(path); + path += appId + "/"; + createDir(path); + path += entity.getType() + "/"; + createDir(path); + path += entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION; + + out = new PrintWriter(new BufferedWriter(new FileWriter(path, true))); out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity)); out.write("\n"); } catch (IOException ioe) { @@ -112,20 +121,7 @@ private void write(TimelineEntity entity, } } - /** - * Aggregates the entity information to the timeline store based on which - * track this entity is to be rolled up to The tracks along which aggregations - * are to be done are given by {@link TimelineAggregationTrack} - * - * Any errors occurring for individual write request objects will be reported - * in the response. - * - * @param data - * a {@link TimelineEntity} object - * a {@link TimelineAggregationTrack} enum value - * @return a {@link TimelineWriteResponse} object. - * @throws IOException - */ + @Override public TimelineWriteResponse aggregate(TimelineEntity data, TimelineAggregationTrack track) throws IOException { return null; @@ -140,5 +136,21 @@ public String getOutputRoot() { public void serviceInit(Configuration conf) throws Exception { outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT, DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT); + entitiesDir = outputRoot + ENTITIES_DIR; + } + + @Override + public void serviceStart() throws Exception { + createDir(outputRoot); + createDir(entitiesDir); + } + + private static void createDir(String dirStr) throws IOException { + File clusterDir = new File(dirStr); + if (!clusterDir.exists()) { + if (!clusterDir.mkdirs()) { + throw new IOException("Could not create directories for " + dirStr); + } + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java index 71ad7ab..85a1412 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; @@ -39,12 +40,19 @@ * timeline store. Any errors occurring for individual write request objects * will be reported in the response. * + * @param clusterId context cluster ID + * @param userId context user ID + * @param flowId context flow ID + * @param flowRunId context flow run ID + * @param appId context app ID * @param data * a {@link TimelineEntities} object. * @return a {@link TimelineWriteResponse} object. * @throws IOException */ - TimelineWriteResponse write(TimelineEntities data) throws IOException; + TimelineWriteResponse write(String clusterId, String userId, + String flowId, String flowRunId, String appId, + TimelineEntities data) throws IOException; /** * Aggregates the entity information to the timeline store based on which diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java index f720454..28ca9b5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java @@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.junit.Test; import org.apache.commons.io.FileUtils; @@ -55,25 +56,36 @@ public void testWriteEntityToFile() throws Exception { entity.setModifiedTime(1425016502000L); te.addEntity(entity); - FileSystemTimelineWriterImpl fsi = new FileSystemTimelineWriterImpl(); - fsi.serviceInit(new Configuration()); - fsi.write(te); + FileSystemTimelineWriterImpl fsi = null; + try { + fsi = new FileSystemTimelineWriterImpl(); + fsi.init(new YarnConfiguration()); + fsi.start(); + fsi.write("cluster_id", "user_id", "flow_id", "flow_run_id", "app_id", te); - String fileName = fsi.getOutputRoot() + "/" + type + "/" + id - + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; - Path path = Paths.get(fileName); - File f = new File(fileName); - assertTrue(f.exists() && !f.isDirectory()); - List data = Files.readAllLines(path, StandardCharsets.UTF_8); - // ensure there's only one entity + 1 new line - assertTrue(data.size() == 2); - String d = data.get(0); - // confirm the contents same as what was written - assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity)); + String fileName = fsi.getOutputRoot() + + "entities/cluster_id/user_id/flow_id/flow_run_id/app_id/" + type + + "/" + id + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + Path path = Paths.get(fileName); + File f = new File(fileName); + assertTrue(f.exists() && !f.isDirectory()); + List data = Files.readAllLines(path, StandardCharsets.UTF_8); + // ensure there's only one entity + 1 new line + assertTrue(data.size() == 2); + String d = data.get(0); + // confirm the contents same as what was written + assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity)); + + // delete the directory + File outputDir = new File(fsi.getOutputRoot()); + FileUtils.deleteDirectory(outputDir); + assertTrue(!(f.exists())); + } finally { + if (fsi != null) { + fsi.stop(); + } + new File(fsi.getOutputRoot()).delete(); + } - // delete the directory - File outputDir = new File(fsi.getOutputRoot()); - FileUtils.deleteDirectory(outputDir); - assertTrue(!(f.exists())); } }