diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 2afff435980..779dc7f076d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2526,6 +2526,12 @@ public static boolean isAclEnabled(Configuration conf) { "org.apache.hadoop.yarn.server.timelineservice.storage" + ".HBaseTimelineReaderImpl"; + public static final String TIMELINE_SERVICE_SUBAPP_ENTITY_WRITE_ENABLED = + TIMELINE_SERVICE_PREFIX + "subapplication-entity-write.enabled"; + + public static final boolean + DEFAULT_TIMELINE_SERVICE_SUBAPP_ENTITY_WRITE_ENABLED = false; + /** * default schema prefix for hbase tables. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java index 02c9519d9be..89a7cbe17c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java @@ -84,6 +84,8 @@ private UserGroupInformation authUgi; + private boolean subApplicationWrite; + public TimelineV2ClientImpl(ApplicationId appId) { super(TimelineV2ClientImpl.class.getName()); this.contextAppId = appId; @@ -123,6 +125,9 @@ protected void serviceInit(Configuration conf) throws Exception { serviceRetryInterval = conf.getLong( YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS); + subApplicationWrite = conf.getBoolean( + YarnConfiguration.TIMELINE_SERVICE_SUBAPP_ENTITY_WRITE_ENABLED, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_SUBAPP_ENTITY_WRITE_ENABLED); entityDispatcher = new TimelineEntityDispatcher(conf); super.serviceInit(conf); } @@ -353,6 +358,7 @@ public Void call() throws Exception { MultivaluedMap params = new MultivaluedMapImpl(); params.add("appid", getContextAppId().toString()); params.add("async", Boolean.toString(!isSync)); + params.add("subappwrite", Boolean.toString(subApplicationWrite)); putObjects("entities", params, entities); return null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index e192a0d9a97..038e5f6e675 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2453,6 +2453,16 @@ + + Timeline client configuration that enables whether to store entity + information into sub application entity table. This is usefull when + entity need to be queried outside the scope of application. + + yarn.timeline-service.subapplication-entity-write.enabled + false + + + yarn.timeline-service.writer.class Storage implementation ATS v2 will use for the TimelineWriter service. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java index 90a69595e55..3eb11c6d86f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java @@ -212,8 +212,11 @@ public void testWriteEntityToHBase() throws Exception { String appName = HBaseTimelineSchemaUtils.convertApplicationIdToString( ApplicationId.newInstance(System.currentTimeMillis() + 9000000L, 1) ); - hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, - runid, appName), te, + TimelineCollectorContext context = + new TimelineCollectorContext(cluster, user, flow, flowVersion, runid, + appName); + context.setSubApplicationWrite(true); + hbi.write(context, te, UserGroupInformation.createRemoteUser(subAppUser)); hbi.stop(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index 027505b5601..959c27f2e29 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -197,7 +197,7 @@ public TimelineWriteResponse write(TimelineCollectorContext context, store(rowKey, te, flowVersion, Tables.ENTITY_TABLE); } - if (!isApplication && !userId.equals(subApplicationUser)) { + if (!isApplication && context.isSubApplicationWrite()) { SubApplicationRowKey subApplicationRowKey = new SubApplicationRowKey(subApplicationUser, clusterId, te.getType(), te.getIdPrefix(), te.getId(), userId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index 8202431459d..3a25b9fac51 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -126,7 +126,7 @@ protected boolean isReadyToAggregate() { * * This method should be reserved for selected critical entities and events. * For normal voluminous writes one should use the async method - * {@link #putEntitiesAsync(TimelineEntities, UserGroupInformation)}. + * {@link #putEntitiesAsync(TimelineEntities, UserGroupInformation, boolean)}. * * @param entities entities to post * @param callerUgi the caller UGI @@ -135,7 +135,7 @@ protected boolean isReadyToAggregate() { * entities. */ public TimelineWriteResponse putEntities(TimelineEntities entities, - UserGroupInformation callerUgi) throws IOException { + UserGroupInformation callerUgi, boolean subappwrite) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("putEntities(entities=" + entities + ", callerUgi=" + callerUgi + ")"); @@ -146,21 +146,30 @@ public TimelineWriteResponse putEntities(TimelineEntities entities, // flush the writer buffer concurrently and swallow any exception // caused by the timeline enitites that are being put here. synchronized (writer) { - response = writeTimelineEntities(entities, callerUgi); + response = writeTimelineEntities(entities, callerUgi, subappwrite); flushBufferedTimelineEntities(); } return response; } + public void putEntities(TimelineEntities entities, + UserGroupInformation callerUgi) throws IOException { + putEntities(entities, callerUgi, false); + } + private TimelineWriteResponse writeTimelineEntities( - TimelineEntities entities, UserGroupInformation callerUgi) + TimelineEntities entities, UserGroupInformation callerUgi, boolean subappwrite) throws IOException { // Update application metrics for aggregation updateAggregateStatus(entities, aggregationGroups, getEntityTypesSkipAggregation()); - final TimelineCollectorContext context = getTimelineEntityContext(); + TimelineCollectorContext context = + new TimelineCollectorContext(getTimelineEntityContext()); + // override to client value + context.setSubApplicationWrite(subappwrite); + return writer.write(context, entities, callerUgi); } @@ -187,13 +196,18 @@ private void flushBufferedTimelineEntities() throws IOException { * entities. */ public void putEntitiesAsync(TimelineEntities entities, - UserGroupInformation callerUgi) throws IOException { + UserGroupInformation callerUgi, boolean subappwrite) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("putEntitiesAsync(entities=" + entities + ", callerUgi=" + callerUgi + ")"); } - writeTimelineEntities(entities, callerUgi); + writeTimelineEntities(entities, callerUgi,subappwrite); + } + + public void putEntitiesAsync(TimelineEntities entities, + UserGroupInformation callerUgi) throws IOException { + putEntitiesAsync(entities, callerUgi, false); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java index 981ee2af5e0..57e641b723c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java @@ -26,6 +26,7 @@ */ public class TimelineCollectorContext extends TimelineContext { private String flowVersion; + private boolean subApplicationWrite; public TimelineCollectorContext() { this(null, null, null, null, 0L, null); @@ -38,6 +39,13 @@ public TimelineCollectorContext(String clusterId, String userId, TimelineUtils.DEFAULT_FLOW_VERSION : flowVersion; } + // copy constructor + public TimelineCollectorContext(TimelineCollectorContext context) { + this(context.getClusterId(), context.getUserId(), context.getFlowName(), + context.getFlowVersion(), context.getFlowRunId(), context.getAppId()); + this.subApplicationWrite=context.isSubApplicationWrite(); + } + @Override public int hashCode() { final int prime = 31; @@ -73,4 +81,12 @@ public String getFlowVersion() { public void setFlowVersion(String version) { this.flowVersion = version; } + + public boolean isSubApplicationWrite() { + return subApplicationWrite; + } + + public void setSubApplicationWrite(boolean subApplicationWrite) { + this.subApplicationWrite = subApplicationWrite; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java index efb5d6bf04c..608da5c5f80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java @@ -143,6 +143,7 @@ public Response putEntities( @Context HttpServletResponse res, @QueryParam("async") String async, @QueryParam("appid") String appId, + @QueryParam("subappwrite") String subappwrite, TimelineEntities entities) { init(res); UserGroupInformation callerUgi = getUser(req); @@ -168,10 +169,11 @@ public Response putEntities( boolean isAsync = async != null && async.trim().equalsIgnoreCase("true"); if (isAsync) { - collector.putEntitiesAsync( - processTimelineEntities(entities), callerUgi); + collector.putEntitiesAsync(processTimelineEntities(entities), callerUgi, + Boolean.valueOf(subappwrite)); } else { - collector.putEntities(processTimelineEntities(entities), callerUgi); + collector.putEntities(processTimelineEntities(entities), callerUgi, + Boolean.valueOf(subappwrite)); } return Response.ok().build(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java index 12bc1cb3f0e..35be4dade6e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java @@ -47,7 +47,7 @@ * @throws IOException if there is any exception encountered while storing or * writing entities to the back end storage. */ - TimelineWriteResponse write(TimelineCollectorContext context, + TimelineWriteResponse write(final TimelineCollectorContext context, TimelineEntities data, UserGroupInformation callerUgi) throws IOException; /**