From 76c40c7fb63856e664e035c89822ad883088d380 Mon Sep 17 00:00:00 2001 From: Rohith Sharma K S Date: Tue, 27 Mar 2018 17:59:11 +0530 Subject: [PATCH] YARN-6936 --- .../hadoop/yarn/client/api/TimelineV2Client.java | 31 ++++++++++++++++++++++ .../yarn/client/api/impl/TimelineV2ClientImpl.java | 28 +++++++++++++++---- .../storage/TestHBaseTimelineStorageEntities.java | 7 +++-- .../storage/HBaseTimelineWriterImpl.java | 2 +- .../collector/TimelineCollector.java | 29 +++++++++++++++----- .../collector/TimelineCollectorContext.java | 16 +++++++++++ .../collector/TimelineCollectorWebService.java | 8 +++--- .../timelineservice/storage/TimelineWriter.java | 2 +- 8 files changed, 104 insertions(+), 19 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java index 423c059319c..1de909aa698 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java @@ -93,4 +93,35 @@ public abstract void putEntitiesAsync(TimelineEntity... entities) * address and timeline delegation token. */ public abstract void setTimelineCollectorInfo(CollectorInfo collectorInfo); + + + /** + *

+ * Send the information of a number of conceptual entities to the timeline + * service v.2 collector. It is a blocking API. The method will not return + * until all the put entities have been persisted. + *

+ * + * @param entities the collection of {@link TimelineEntity} + * @throws IOException if there are I/O errors + * @throws YarnException if entities are incomplete/invalid + */ + @Public + public abstract void putSubAppEntities(TimelineEntity... entities) + throws IOException, YarnException; + + /** + *

+ * Send the information of a number of conceptual entities to the timeline + * service v.2 collector. It is an asynchronous API. The method will return + * once all the entities are received. + *

+ * + * @param entities the collection of {@link TimelineEntity} + * @throws IOException if there are I/O errors + * @throws YarnException if entities are incomplete/invalid + */ + @Public + public abstract void putSubAppEntitiesAsync(TimelineEntity... entities) + throws IOException, YarnException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java index 02c9519d9be..bf733e6bc27 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java @@ -69,6 +69,7 @@ private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/"; private TimelineEntityDispatcher entityDispatcher; + private TimelineEntityDispatcher subAppEntityDispatcher; private volatile String timelineServiceAddress; @VisibleForTesting volatile Token currentTimelineToken = null; @@ -124,6 +125,7 @@ protected void serviceInit(Configuration conf) throws Exception { YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS); entityDispatcher = new TimelineEntityDispatcher(conf); + subAppEntityDispatcher = new TimelineEntityDispatcher(conf); super.serviceInit(conf); } @@ -131,24 +133,38 @@ protected void serviceInit(Configuration conf) throws Exception { protected void serviceStart() throws Exception { super.serviceStart(); entityDispatcher.start(); + subAppEntityDispatcher.start(); } @Override protected void serviceStop() throws Exception { entityDispatcher.stop(); + subAppEntityDispatcher.stop(); super.serviceStop(); } @Override public void putEntities(TimelineEntity... entities) throws IOException, YarnException { - entityDispatcher.dispatchEntities(true, entities); + entityDispatcher.dispatchEntities(true, entities, false); } @Override public void putEntitiesAsync(TimelineEntity... entities) throws IOException, YarnException { - entityDispatcher.dispatchEntities(false, entities); + entityDispatcher.dispatchEntities(false, entities, false); + } + + @Override + public void putSubAppEntities(TimelineEntity... entities) + throws IOException, YarnException { + subAppEntityDispatcher.dispatchEntities(true, entities, true); + } + + @Override + public void putSubAppEntitiesAsync(TimelineEntity... entities) + throws IOException, YarnException { + subAppEntityDispatcher.dispatchEntities(false, entities, true); } @Override @@ -346,13 +362,15 @@ private int pollTimelineServiceAddress(int retries) throws YarnException { private final TimelineEntities entities; private final boolean isSync; - EntitiesHolder(final TimelineEntities entities, final boolean isSync) { + EntitiesHolder(final TimelineEntities entities, final boolean isSync, + final boolean subappwrite) { super(new Callable() { // publishEntities() public Void call() throws Exception { MultivaluedMap params = new MultivaluedMapImpl(); params.add("appid", getContextAppId().toString()); params.add("async", Boolean.toString(!isSync)); + params.add("subappwrite", Boolean.toString(subappwrite)); putObjects("entities", params, entities); return null; } @@ -496,7 +514,7 @@ private void publishWithoutBlockingOnQueue( } public void dispatchEntities(boolean sync, - TimelineEntity[] entitiesTobePublished) throws YarnException { + TimelineEntity[] entitiesTobePublished, boolean subappwrite) throws YarnException { if (executor.isShutdown()) { throw new YarnException("Timeline client is in the process of stopping," + " not accepting any more TimelineEntities"); @@ -509,7 +527,7 @@ public void dispatchEntities(boolean sync, } // created a holder and place it in queue - EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync); + EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync, subappwrite); try { timelineEntityQueue.put(entitiesHolder); } catch (InterruptedException e) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java index 90a69595e55..3eb11c6d86f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java @@ -212,8 +212,11 @@ public void testWriteEntityToHBase() throws Exception { String appName = HBaseTimelineSchemaUtils.convertApplicationIdToString( ApplicationId.newInstance(System.currentTimeMillis() + 9000000L, 1) ); - hbi.write(new TimelineCollectorContext(cluster, user, flow, flowVersion, - runid, appName), te, + TimelineCollectorContext context = + new TimelineCollectorContext(cluster, user, flow, flowVersion, runid, + appName); + context.setSubApplicationWrite(true); + hbi.write(context, te, UserGroupInformation.createRemoteUser(subAppUser)); hbi.stop(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index 027505b5601..959c27f2e29 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -197,7 +197,7 @@ public TimelineWriteResponse write(TimelineCollectorContext context, store(rowKey, te, flowVersion, Tables.ENTITY_TABLE); } - if (!isApplication && !userId.equals(subApplicationUser)) { + if (!isApplication && context.isSubApplicationWrite()) { SubApplicationRowKey subApplicationRowKey = new SubApplicationRowKey(subApplicationUser, clusterId, te.getType(), te.getIdPrefix(), te.getId(), userId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index 8202431459d..c8d5e83c28c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -126,7 +126,7 @@ protected boolean isReadyToAggregate() { * * This method should be reserved for selected critical entities and events. * For normal voluminous writes one should use the async method - * {@link #putEntitiesAsync(TimelineEntities, UserGroupInformation)}. + * {@link #putEntitiesAsync(TimelineEntities, UserGroupInformation, boolean)}. * * @param entities entities to post * @param callerUgi the caller UGI @@ -135,7 +135,7 @@ protected boolean isReadyToAggregate() { * entities. */ public TimelineWriteResponse putEntities(TimelineEntities entities, - UserGroupInformation callerUgi) throws IOException { + UserGroupInformation callerUgi, boolean subappwrite) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("putEntities(entities=" + entities + ", callerUgi=" + callerUgi + ")"); @@ -146,21 +146,30 @@ public TimelineWriteResponse putEntities(TimelineEntities entities, // flush the writer buffer concurrently and swallow any exception // caused by the timeline enitites that are being put here. synchronized (writer) { - response = writeTimelineEntities(entities, callerUgi); + response = writeTimelineEntities(entities, callerUgi, subappwrite); flushBufferedTimelineEntities(); } return response; } + public void putEntities(TimelineEntities entities, + UserGroupInformation callerUgi) throws IOException { + putEntities(entities, callerUgi, false); + } + private TimelineWriteResponse writeTimelineEntities( - TimelineEntities entities, UserGroupInformation callerUgi) + TimelineEntities entities, UserGroupInformation callerUgi, boolean isSubAppEntities) throws IOException { // Update application metrics for aggregation updateAggregateStatus(entities, aggregationGroups, getEntityTypesSkipAggregation()); - final TimelineCollectorContext context = getTimelineEntityContext(); + TimelineCollectorContext context = + new TimelineCollectorContext(getTimelineEntityContext()); + // override to client value + context.setSubApplicationWrite(isSubAppEntities); + return writer.write(context, entities, callerUgi); } @@ -183,17 +192,23 @@ private void flushBufferedTimelineEntities() throws IOException { * * @param entities entities to post * @param callerUgi the caller UGI + * @param isSubAppEntities is sub app entities * @throws IOException if there is any exception encounted while putting * entities. */ public void putEntitiesAsync(TimelineEntities entities, - UserGroupInformation callerUgi) throws IOException { + UserGroupInformation callerUgi, boolean isSubAppEntities) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("putEntitiesAsync(entities=" + entities + ", callerUgi=" + callerUgi + ")"); } - writeTimelineEntities(entities, callerUgi); + writeTimelineEntities(entities, callerUgi,isSubAppEntities); + } + + public void putEntitiesAsync(TimelineEntities entities, + UserGroupInformation callerUgi) throws IOException { + putEntitiesAsync(entities, callerUgi, false); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java index 981ee2af5e0..57e641b723c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java @@ -26,6 +26,7 @@ */ public class TimelineCollectorContext extends TimelineContext { private String flowVersion; + private boolean subApplicationWrite; public TimelineCollectorContext() { this(null, null, null, null, 0L, null); @@ -38,6 +39,13 @@ public TimelineCollectorContext(String clusterId, String userId, TimelineUtils.DEFAULT_FLOW_VERSION : flowVersion; } + // copy constructor + public TimelineCollectorContext(TimelineCollectorContext context) { + this(context.getClusterId(), context.getUserId(), context.getFlowName(), + context.getFlowVersion(), context.getFlowRunId(), context.getAppId()); + this.subApplicationWrite=context.isSubApplicationWrite(); + } + @Override public int hashCode() { final int prime = 31; @@ -73,4 +81,12 @@ public String getFlowVersion() { public void setFlowVersion(String version) { this.flowVersion = version; } + + public boolean isSubApplicationWrite() { + return subApplicationWrite; + } + + public void setSubApplicationWrite(boolean subApplicationWrite) { + this.subApplicationWrite = subApplicationWrite; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java index efb5d6bf04c..c860ad0ae65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java @@ -143,6 +143,7 @@ public Response putEntities( @Context HttpServletResponse res, @QueryParam("async") String async, @QueryParam("appid") String appId, + @QueryParam("subappwrite") String isSubAppEntities, TimelineEntities entities) { init(res); UserGroupInformation callerUgi = getUser(req); @@ -168,10 +169,11 @@ public Response putEntities( boolean isAsync = async != null && async.trim().equalsIgnoreCase("true"); if (isAsync) { - collector.putEntitiesAsync( - processTimelineEntities(entities), callerUgi); + collector.putEntitiesAsync(processTimelineEntities(entities), callerUgi, + Boolean.valueOf(isSubAppEntities)); } else { - collector.putEntities(processTimelineEntities(entities), callerUgi); + collector.putEntities(processTimelineEntities(entities), callerUgi, + Boolean.valueOf(isSubAppEntities)); } return Response.ok().build(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java index 12bc1cb3f0e..35be4dade6e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java @@ -47,7 +47,7 @@ * @throws IOException if there is any exception encountered while storing or * writing entities to the back end storage. */ - TimelineWriteResponse write(TimelineCollectorContext context, + TimelineWriteResponse write(final TimelineCollectorContext context, TimelineEntities data, UserGroupInformation callerUgi) throws IOException; /** -- 2.13.6 (Apple Git-96)