diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java index 7a289b9..845e2cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java @@ -549,20 +549,10 @@ public boolean equals(Object obj) { public int compareTo(TimelineEntity other) { int comparison = getType().compareTo(other.getType()); if (comparison == 0) { - if (getCreatedTime() == null) { - if (other.getCreatedTime() == null) { - return getId().compareTo(other.getId()); - } else { - return 1; - } - } - if (other.getCreatedTime() == null) { - return -1; - } - if (getCreatedTime() > other.getCreatedTime()) { - // Order by created time desc + if (getIdPrefix() > other.getIdPrefix()) { + // Descending order by entity id prefix return -1; - } else if (getCreatedTime() < other.getCreatedTime()) { + } else if (getIdPrefix() < other.getIdPrefix()) { return 1; } else { return getId().compareTo(other.getId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java index db1c1cf..f4d5601 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java @@ -806,7 +806,8 @@ public void testGetEntitiesByUID() throws Exception { assertEquals(TimelineUIDConverter.GENERIC_ENTITY_UID.encodeUID( new TimelineReaderContext(context.getClusterId(), context.getUserId(), context.getFlowName(), - context.getFlowRunId(), context.getAppId(), "type1", + context.getFlowRunId(), context.getAppId(), "type1", + entity.getIdPrefix(), entity.getId())), entityUID); } } @@ -909,8 +910,8 @@ public void testUIDQueryWithAndWithoutFlowContextInfo() throws Exception { String uid = (String) entity.getInfo().get(TimelineReaderManager.UID_KEY); assertNotNull(uid); - assertTrue(uid.equals(appUIDWithoutFlowInfo + "!type1!entity1") || - uid.equals(appUIDWithoutFlowInfo + "!type1!entity2")); + assertTrue(uid.equals(appUIDWithoutFlowInfo + "!type1!0!entity1") + || uid.equals(appUIDWithoutFlowInfo + "!type1!0!entity2")); } String entityUIDWithFlowInfo = appUIDWithFlowInfo + "!type1!entity1"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntityFilters.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntityFilters.java index 8f2b725..2cfe1e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntityFilters.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntityFilters.java @@ -115,6 +115,8 @@ private TimelineFilterList eventFilters; private static final long DEFAULT_BEGIN_TIME = 0L; private static final long DEFAULT_END_TIME = Long.MAX_VALUE; + private String fromId; + private Long fromIdPrefix; /** * Default limit of number of entities to return for getEntities API. @@ -125,6 +127,19 @@ public TimelineEntityFilters() { this(null, null, null, null, null, null, null, null, null); } + public TimelineEntityFilters(Long entityLimit, Long timeBegin, Long timeEnd, + TimelineFilterList entityRelatesTo, TimelineFilterList entityIsRelatedTo, + TimelineFilterList entityInfoFilters, + TimelineFilterList entityConfigFilters, + TimelineFilterList entityMetricFilters, + TimelineFilterList entityEventFilters, Long fromidprefix, String fromid) { + this(entityLimit, timeBegin, timeEnd, entityRelatesTo, entityIsRelatedTo, + entityInfoFilters, entityConfigFilters, entityMetricFilters, + entityEventFilters); + this.fromIdPrefix = fromidprefix; + this.fromId = fromid; + } + public TimelineEntityFilters( Long entityLimit, Long timeBegin, Long timeEnd, TimelineFilterList entityRelatesTo, @@ -239,4 +254,20 @@ public TimelineFilterList getEventFilters() { public void setEventFilters(TimelineFilterList filters) { this.eventFilters = filters; } + + public String getFromId() { + return fromId; + } + + public void setFromId(String fromId) { + this.fromId = fromId; + } + + public Long getFromIdPrefix() { + return fromIdPrefix; + } + + public void setFromIdPrefix(Long fromIdPrefix) { + this.fromIdPrefix = fromIdPrefix; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderContext.java index 633bb23..5f308cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderContext.java @@ -31,6 +31,7 @@ private String entityType; private String entityId; + private Long entityIdPrefix; public TimelineReaderContext(String clusterId, String userId, String flowName, Long flowRunId, String appId, String entityType, String entityId) { super(clusterId, userId, flowName, flowRunId, appId); @@ -38,16 +39,25 @@ public TimelineReaderContext(String clusterId, String userId, String flowName, this.entityId = entityId; } + public TimelineReaderContext(String clusterId, String userId, String flowName, + Long flowRunId, String appId, String entityType, Long entityIdPrefix, + String entityId) { + this(clusterId, userId, flowName, flowRunId, appId, entityType, entityId); + this.entityIdPrefix = entityIdPrefix; + } + public TimelineReaderContext(TimelineReaderContext other) { this(other.getClusterId(), other.getUserId(), other.getFlowName(), other.getFlowRunId(), other.getAppId(), other.getEntityType(), - other.getEntityId()); + other.getEntityIdPrefix(), other.getEntityId()); } @Override public int hashCode() { final int prime = 31; int result = super.hashCode(); + result = prime * result + + ((entityIdPrefix == null) ? 0 : entityIdPrefix.hashCode()); result = prime * result + ((entityId == null) ? 0 : entityId.hashCode()); result = prime * result + ((entityType == null) ? 0 : entityType.hashCode()); @@ -95,4 +105,12 @@ public String getEntityId() { public void setEntityId(String id) { this.entityId = id; } + + public Long getEntityIdPrefix() { + return entityIdPrefix; + } + + public void setEntityIdPrefix(Long entityIdPrefix) { + this.entityIdPrefix = entityIdPrefix; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java index 6e8b823..66e4cbf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java @@ -113,6 +113,7 @@ private static void fillUID(TimelineEntityType entityType, } } context.setEntityType(entity.getType()); + context.setEntityIdPrefix(entity.getIdPrefix()); context.setEntityId(entity.getId()); entity.setUID(UID_KEY, TimelineUIDConverter.GENERIC_ENTITY_UID.encodeUID(context)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java index 6e8e029..cfdfbc5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java @@ -265,6 +265,11 @@ public TimelineAbout about( * or has a value less than 1, and metrics have to be retrieved, then * metricsLimit will be considered as 1 i.e. latest single value of * metric(s) will be returned. (Optional query param). + * @param fromIdPrefix If specified, then retrieve entities earlier than and + * including the specified ID. If it is null, retrieved entities are + * always first 'N' rows in the back end storage where N is limit. + * @param fromId If specified, then fromIdPrefix mandatory. Otherwise fromId + * is not accountable. * * @return If successful, a HTTP 200(OK) response having a JSON representing * a set of TimelineEntity instances of the given entity type @@ -295,7 +300,9 @@ public TimelineAbout about( @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("fromidprefix") String fromIdPrefix, + @QueryParam("fromid") String fromId) { String url = req.getRequestURI() + (req.getQueryString() == null ? "" : QUERY_STRING_SEP + req.getQueryString()); @@ -307,18 +314,21 @@ public TimelineAbout about( init(res); TimelineReaderManager timelineReaderManager = getTimelineReaderManager(); Set entities = null; + try { TimelineReaderContext context = TimelineUIDConverter.APPLICATION_UID.decodeUID(uId); if (context == null) { throw new BadRequestException("Incorrect UID " + uId); } - context.setEntityType( - TimelineReaderWebServicesUtils.parseStr(entityType)); + context + .setEntityType(TimelineReaderWebServicesUtils.parseStr(entityType)); + entities = timelineReaderManager.getEntities(context, TimelineReaderWebServicesUtils.createTimelineEntityFilters( limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, - infofilters, conffilters, metricfilters, eventfilters), + infofilters, conffilters, metricfilters, eventfilters, + fromIdPrefix, fromId), TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); } catch (Exception e) { @@ -401,6 +411,11 @@ public TimelineAbout about( * or has a value less than 1, and metrics have to be retrieved, then * metricsLimit will be considered as 1 i.e. latest single value of * metric(s) will be returned. (Optional query param). + * @param fromIdPrefix If specified, then retrieve entities earlier than and + * including the specified ID. If it is null, retrieved entities are + * always first 'N' rows in the back end storage where N is limit. + * @param fromId If specified, then fromIdPrefix mandatory. Otherwise fromId + * is not accountable. * * @return If successful, a HTTP 200(OK) response having a JSON representing * a set of TimelineEntity instances of the given entity type @@ -436,11 +451,14 @@ public TimelineAbout about( @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("fromidprefix") String fromIdPrefix, + @QueryParam("fromid") String fromId) { return getEntities(req, res, null, appId, entityType, userId, flowName, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, - confsToRetrieve, metricsToRetrieve, fields, metricsLimit); + confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromIdPrefix, + fromId); } /** @@ -511,6 +529,11 @@ public TimelineAbout about( * or has a value less than 1, and metrics have to be retrieved, then * metricsLimit will be considered as 1 i.e. latest single value of * metric(s) will be returned. (Optional query param). + * @param fromIdPrefix If specified, then retrieve entities earlier than and + * including the specified ID. If it is null, retrieved entities are + * always first 'N' rows in the back end storage where N is limit. + * @param fromId If specified, then fromIdPrefix mandatory. Otherwise fromId + * is not accountable. * * @return If successful, a HTTP 200(OK) response having a JSON representing * a set of TimelineEntity instances of the given entity type @@ -547,7 +570,9 @@ public TimelineAbout about( @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("fromidprefix") String fromIdPrefix, + @QueryParam("fromid") String fromId) { String url = req.getRequestURI() + (req.getQueryString() == null ? "" : QUERY_STRING_SEP + req.getQueryString()); @@ -559,13 +584,16 @@ public TimelineAbout about( init(res); TimelineReaderManager timelineReaderManager = getTimelineReaderManager(); Set entities = null; + try { - entities = timelineReaderManager.getEntities( - TimelineReaderWebServicesUtils.createTimelineReaderContext( - clusterId, userId, flowName, flowRunId, appId, entityType, null), + TimelineReaderContext context = TimelineReaderWebServicesUtils + .createTimelineReaderContext(clusterId, userId, flowName, flowRunId, + appId, entityType, null, null); + entities = timelineReaderManager.getEntities(context, TimelineReaderWebServicesUtils.createTimelineEntityFilters( limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, - infofilters, conffilters, metricfilters, eventfilters), + infofilters, conffilters, metricfilters, eventfilters, + fromIdPrefix, fromId), TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); } catch (Exception e) { @@ -704,6 +732,7 @@ public TimelineEntity getEntity( * or has a value less than 1, and metrics have to be retrieved, then * metricsLimit will be considered as 1 i.e. latest single value of * metric(s) will be returned. (Optional query param). + * @param entityIdPrefix If specified, then entity retrieval is faster. * * @return If successful, a HTTP 200(OK) response having a JSON representing a * TimelineEntity instance is returned.
@@ -730,10 +759,11 @@ public TimelineEntity getEntity( @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("entityidprefix") String entityIdPrefix) { return getEntity(req, res, null, appId, entityType, entityId, userId, flowName, flowRunId, confsToRetrieve, metricsToRetrieve, fields, - metricsLimit); + metricsLimit, entityIdPrefix); } /** @@ -775,6 +805,7 @@ public TimelineEntity getEntity( * or has a value less than 1, and metrics have to be retrieved, then * metricsLimit will be considered as 1 i.e. latest single value of * metric(s) will be returned. (Optional query param). + * @param entityIdPrefix If specified, then entity retrieval is faster. * * @return If successful, a HTTP 200(OK) response having a JSON representing a * TimelineEntity instance is returned.
@@ -802,7 +833,8 @@ public TimelineEntity getEntity( @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("entityidprefix") String entityIdPrefix) { String url = req.getRequestURI() + (req.getQueryString() == null ? "" : QUERY_STRING_SEP + req.getQueryString()); @@ -817,7 +849,8 @@ public TimelineEntity getEntity( try { entity = timelineReaderManager.getEntity( TimelineReaderWebServicesUtils.createTimelineReaderContext( - clusterId, userId, flowName, flowRunId, appId, entityType, entityId), + clusterId, userId, flowName, flowRunId, appId, entityType, + entityIdPrefix, entityId), TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); } catch (Exception e) { @@ -993,7 +1026,7 @@ public TimelineEntity getFlowRun( entity = timelineReaderManager.getEntity( TimelineReaderWebServicesUtils.createTimelineReaderContext( clusterId, userId, flowName, flowRunId, null, - TimelineEntityType.YARN_FLOW_RUN.toString(), null), + TimelineEntityType.YARN_FLOW_RUN.toString(), null, null), TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( null, metricsToRetrieve, null, null)); } catch (Exception e) { @@ -1082,7 +1115,7 @@ public TimelineEntity getFlowRun( entities = timelineReaderManager.getEntities(context, TimelineReaderWebServicesUtils.createTimelineEntityFilters( limit, createdTimeStart, createdTimeEnd, null, null, null, - null, null, null), + null, null, null, null, null), TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( null, metricsToRetrieve, fields, null)); } catch (Exception e) { @@ -1218,10 +1251,10 @@ public TimelineEntity getFlowRun( entities = timelineReaderManager.getEntities( TimelineReaderWebServicesUtils.createTimelineReaderContext( clusterId, userId, flowName, null, null, - TimelineEntityType.YARN_FLOW_RUN.toString(), null), + TimelineEntityType.YARN_FLOW_RUN.toString(), null, null), TimelineReaderWebServicesUtils.createTimelineEntityFilters( limit, createdTimeStart, createdTimeEnd, null, null, null, - null, null, null), + null, null, null, null, null), TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( null, metricsToRetrieve, fields, null)); } catch (Exception e) { @@ -1340,13 +1373,14 @@ public TimelineEntity getFlowRun( DateRange range = parseDateRange(dateRange); TimelineEntityFilters entityFilters = TimelineReaderWebServicesUtils.createTimelineEntityFilters( - limit, null, null, null, null, null, null, null, null); + limit, null, null, null, null, null, null, null, null, null, + null); entityFilters.setCreatedTimeBegin(range.dateStart); entityFilters.setCreatedTimeEnd(range.dateEnd); entities = timelineReaderManager.getEntities( TimelineReaderWebServicesUtils.createTimelineReaderContext( clusterId, null, null, null, null, - TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null), + TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null, null), entityFilters, TimelineReaderWebServicesUtils. createTimelineDataToRetrieve(null, null, null, null)); } catch (Exception e) { @@ -1585,7 +1619,7 @@ public TimelineEntity getApp( entity = timelineReaderManager.getEntity( TimelineReaderWebServicesUtils.createTimelineReaderContext( clusterId, userId, flowName, flowRunId, appId, - TimelineEntityType.YARN_APPLICATION.toString(), null), + TimelineEntityType.YARN_APPLICATION.toString(), null, null), TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); } catch (Exception e) { @@ -1711,7 +1745,8 @@ public TimelineEntity getApp( entities = timelineReaderManager.getEntities(context, TimelineReaderWebServicesUtils.createTimelineEntityFilters( limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, - infofilters, conffilters, metricfilters, eventfilters), + infofilters, conffilters, metricfilters, eventfilters, null, + null), TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); } catch (Exception e) { @@ -1823,7 +1858,7 @@ public TimelineEntity getApp( TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, - confsToRetrieve, metricsToRetrieve, fields, metricsLimit); + confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, null); } /** @@ -1925,7 +1960,7 @@ public TimelineEntity getApp( TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, - confsToRetrieve, metricsToRetrieve, fields, metricsLimit); + confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, null); } /** @@ -2021,7 +2056,7 @@ public TimelineEntity getApp( TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName, null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, - confsToRetrieve, metricsToRetrieve, fields, metricsLimit); + confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, null); } /** @@ -2119,7 +2154,7 @@ public TimelineEntity getApp( TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName, null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, - confsToRetrieve, metricsToRetrieve, fields, metricsLimit); + confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, null); } /** @@ -2190,6 +2225,11 @@ public TimelineEntity getApp( * have to be retrieved, then metricsLimit will be considered as 1 * i.e. latest single value of metric(s) will be returned. (Optional * query param). + * @param fromIdPrefix If specified, then retrieve entities earlier than and + * including the specified ID. If it is null, retrieved entities are + * always first 'N' rows in the back end storage where N is limit. + * @param fromId If specified, then fromIdPrefix mandatory. Otherwise fromId + * is not accountable. * * @return If successful, a HTTP 200(OK) response having a JSON representing a * set of TimelineEntity instances of the app-attempt @@ -2222,12 +2262,14 @@ public TimelineEntity getApp( @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("fromidprefix") String fromIdPrefix, + @QueryParam("fromid") String fromId) { return getAppAttempts(req, res, null, appId, userId, flowName, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, confsToRetrieve, - metricsToRetrieve, fields, metricsLimit); + metricsToRetrieve, fields, metricsLimit, fromIdPrefix, fromId); } /** @@ -2299,6 +2341,11 @@ public TimelineEntity getApp( * have to be retrieved, then metricsLimit will be considered as 1 * i.e. latest single value of metric(s) will be returned. (Optional * query param). + * @param fromIdPrefix If specified, then retrieve entities earlier than and + * including the specified ID. If it is null, retrieved entities are + * always first 'N' rows in the back end storage where N is limit. + * @param fromId If specified, then fromIdPrefix mandatory. Otherwise fromId + * is not accountable. * * @return If successful, a HTTP 200(OK) response having a JSON representing a * set of TimelineEntity instances of the app-attempts @@ -2332,13 +2379,16 @@ public TimelineEntity getApp( @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("fromidprefix") String fromIdPrefix, + @QueryParam("fromid") String fromId) { return getEntities(req, res, clusterId, appId, TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(), userId, flowName, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, - confsToRetrieve, metricsToRetrieve, fields, metricsLimit); + confsToRetrieve, metricsToRetrieve, fields, metricsLimit, + fromIdPrefix, fromId); } /** @@ -2381,6 +2431,7 @@ public TimelineEntity getApp( * have to be retrieved, then metricsLimit will be considered as 1 * i.e. latest single value of metric(s) will be returned. (Optional * query param). + * @param entityIdPrefix If specified, then entity retrieval is faster. * * @return If successful, a HTTP 200(OK) response having a JSON representing a * TimelineEntity instance is returned.
@@ -2405,9 +2456,11 @@ public TimelineEntity getAppAttempt(@Context HttpServletRequest req, @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("entityidprefix") String entityIdPrefix) { return getAppAttempt(req, res, null, appId, appAttemptId, userId, flowName, - flowRunId, confsToRetrieve, metricsToRetrieve, fields, metricsLimit); + flowRunId, confsToRetrieve, metricsToRetrieve, fields, metricsLimit, + entityIdPrefix); } /** @@ -2450,6 +2503,7 @@ public TimelineEntity getAppAttempt(@Context HttpServletRequest req, * have to be retrieved, then metricsLimit will be considered as 1 * i.e. latest single value of metric(s) will be returned. (Optional * query param). + * @param entityIdPrefix If specified, then entity retrieval is faster. * * @return If successful, a HTTP 200(OK) response having a JSON representing a * TimelineEntity instance is returned.
@@ -2476,11 +2530,12 @@ public TimelineEntity getAppAttempt(@Context HttpServletRequest req, @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("entityidprefix") String entityIdPrefix) { return getEntity(req, res, clusterId, appId, TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(), appAttemptId, userId, flowName, flowRunId, confsToRetrieve, metricsToRetrieve, fields, - metricsLimit); + metricsLimit, entityIdPrefix); } /** @@ -2553,6 +2608,11 @@ public TimelineEntity getAppAttempt(@Context HttpServletRequest req, * have to be retrieved, then metricsLimit will be considered as 1 * i.e. latest single value of metric(s) will be returned. (Optional * query param). + * @param fromIdPrefix If specified, then retrieve entities earlier than and + * including the specified ID. If it is null, retrieved entities are + * always first 'N' rows in the back end storage where N is limit. + * @param fromId If specified, then fromIdPrefix mandatory. Otherwise fromId + * is not accountable. * * @return If successful, a HTTP 200(OK) response having a JSON representing a * set of TimelineEntity instances of the containers @@ -2586,11 +2646,14 @@ public TimelineEntity getAppAttempt(@Context HttpServletRequest req, @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("fromidprefix") String fromIdPrefix, + @QueryParam("fromid") String fromId) { return getContainers(req, res, null, appId, appattemptId, userId, flowName, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, - confsToRetrieve, metricsToRetrieve, fields, metricsLimit); + confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromIdPrefix, + fromId); } /** @@ -2664,6 +2727,11 @@ public TimelineEntity getAppAttempt(@Context HttpServletRequest req, * have to be retrieved, then metricsLimit will be considered as 1 * i.e. latest single value of metric(s) will be returned. (Optional * query param). + * @param fromIdPrefix If specified, then retrieve entities earlier than and + * including the specified ID. If it is null, retrieved entities are + * always first 'N' rows in the back end storage where N is limit. + * @param fromId If specified, then fromIdPrefix mandatory. Otherwise fromId + * is not accountable. * * @return If successful, a HTTP 200(OK) response having a JSON representing a * set of TimelineEntity instances of the containers @@ -2699,7 +2767,9 @@ public TimelineEntity getAppAttempt(@Context HttpServletRequest req, @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("fromidprefix") String fromIdPrefix, + @QueryParam("fromid") String fromId) { String entityType = TimelineEntityType.YARN_CONTAINER.toString(); String parentEntityType = @@ -2717,7 +2787,8 @@ public TimelineEntity getAppAttempt(@Context HttpServletRequest req, return getEntities(req, res, clusterId, appId, entityType, userId, flowName, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilter, conffilters, metricfilters, eventfilters, - confsToRetrieve, metricsToRetrieve, fields, metricsLimit); + confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromIdPrefix, + fromId); } /** @@ -2759,6 +2830,7 @@ public TimelineEntity getAppAttempt(@Context HttpServletRequest req, * have to be retrieved, then metricsLimit will be considered as 1 * i.e. latest single value of metric(s) will be returned. (Optional * query param). + * @param entityIdPrefix If specified, then entity retrieval is faster. * * @return If successful, a HTTP 200(OK) response having a JSON representing * TimelineEntity instance is returned.
@@ -2783,9 +2855,11 @@ public TimelineEntity getContainer(@Context HttpServletRequest req, @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("entityidprefix") String entityIdPrefix) { return getContainer(req, res, null, appId, containerId, userId, flowName, - flowRunId, confsToRetrieve, metricsToRetrieve, fields, metricsLimit); + flowRunId, confsToRetrieve, metricsToRetrieve, fields, metricsLimit, + entityIdPrefix); } /** @@ -2828,6 +2902,7 @@ public TimelineEntity getContainer(@Context HttpServletRequest req, * have to be retrieved, then metricsLimit will be considered as 1 * i.e. latest single value of metric(s) will be returned. (Optional * query param). + * @param entityIdPrefix If specified, then entity retrieval is faster. * * @return If successful, a HTTP 200(OK) response having a JSON representing a * TimelineEntity instance is returned.
@@ -2854,11 +2929,12 @@ public TimelineEntity getContainer(@Context HttpServletRequest req, @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("entityidprefix") String entityIdPrefix) { return getEntity(req, res, clusterId, appId, TimelineEntityType.YARN_CONTAINER.toString(), containerId, userId, flowName, flowRunId, confsToRetrieve, metricsToRetrieve, fields, - metricsLimit); + metricsLimit, entityIdPrefix); } /** @@ -2953,7 +3029,7 @@ public TimelineEntity getContainer(@Context HttpServletRequest req, results = timelineReaderManager.getEntityTypes( TimelineReaderWebServicesUtils.createTimelineReaderContext( clusterId, userId, flowName, flowRunId, appId, - null, null)); + null, null, null)); } catch (Exception e) { handleException(e, url, startTime, "flowrunid"); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServicesUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServicesUtils.java index 7fc8cb8..68cb48f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServicesUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServicesUtils.java @@ -23,6 +23,7 @@ import javax.servlet.http.HttpServletRequest; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; @@ -49,10 +50,10 @@ private TimelineReaderWebServicesUtils() { */ static TimelineReaderContext createTimelineReaderContext(String clusterId, String userId, String flowName, String flowRunId, String appId, - String entityType, String entityId) { + String entityType, String entityIdPrefix, String entityId) { return new TimelineReaderContext(parseStr(clusterId), parseStr(userId), parseStr(flowName), parseLongStr(flowRunId), parseStr(appId), - parseStr(entityType), parseStr(entityId)); + parseStr(entityType), parseLongStr(entityIdPrefix), parseStr(entityId)); } /** @@ -73,12 +74,14 @@ static TimelineReaderContext createTimelineReaderContext(String clusterId, static TimelineEntityFilters createTimelineEntityFilters(String limit, String createdTimeStart, String createdTimeEnd, String relatesTo, String isRelatedTo, String infofilters, String conffilters, - String metricfilters, String eventfilters) throws TimelineParseException { + String metricfilters, String eventfilters, String fromidprefix, + String fromid) throws TimelineParseException { return new TimelineEntityFilters(parseLongStr(limit), parseLongStr(createdTimeStart), parseLongStr(createdTimeEnd), parseRelationFilters(relatesTo), parseRelationFilters(isRelatedTo), parseKVFilters(infofilters, false), parseKVFilters(conffilters, true), - parseMetricFilters(metricfilters), parseEventFilters(eventfilters)); + parseMetricFilters(metricfilters), parseEventFilters(eventfilters), + parseLongStr(fromidprefix), StringUtils.trimToNull(fromid)); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineUIDConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineUIDConverter.java index 08e5405..780cfd0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineUIDConverter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineUIDConverter.java @@ -155,12 +155,14 @@ String encodeUID(TimelineReaderContext context) { // Flow information exists. String[] entityTupleArr = {context.getClusterId(), context.getUserId(), context.getFlowName(), context.getFlowRunId().toString(), - context.getAppId(), context.getEntityType(), context.getEntityId()}; + context.getAppId(), context.getEntityType(), + context.getEntityIdPrefix().toString(), context.getEntityId() }; return joinAndEscapeUIDParts(entityTupleArr); } else { // Only entity and app information exists. Flow info does not exist. String[] entityTupleArr = {context.getClusterId(), context.getAppId(), - context.getEntityType(), context.getEntityId()}; + context.getEntityType(), context.getEntityIdPrefix().toString(), + context.getEntityId() }; return joinAndEscapeUIDParts(entityTupleArr); } } @@ -171,20 +173,21 @@ TimelineReaderContext decodeUID(String uId) throws Exception { return null; } List entityTupleList = splitUID(uId); - // Should have 7 parts i.e. cluster, user, flow name, flowrun id, app id, - // entity type and entity id OR should have 4 parts i.e. cluster, app id, + // Should have 8 parts i.e. cluster, user, flow name, flowrun id, app id, + // entity type and entity id OR should have 5 parts i.e. cluster, app id, // entity type and entity id. - if (entityTupleList.size() == 7) { + if (entityTupleList.size() == 8) { // Flow information exists. return new TimelineReaderContext(entityTupleList.get(0), entityTupleList.get(1), entityTupleList.get(2), Long.parseLong(entityTupleList.get(3)), entityTupleList.get(4), - entityTupleList.get(5), entityTupleList.get(6)); - } else if (entityTupleList.size() == 4) { + entityTupleList.get(5), Long.parseLong(entityTupleList.get(6)), + entityTupleList.get(7)); + } else if (entityTupleList.size() == 5) { // Flow information does not exist. return new TimelineReaderContext(entityTupleList.get(0), null, null, null, entityTupleList.get(1), entityTupleList.get(2), - entityTupleList.get(3)); + Long.parseLong(entityTupleList.get(3)), entityTupleList.get(4)); } else { return null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java index cccae26..cb5bfc3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java @@ -181,6 +181,22 @@ private static CompareOp getHBaseCompareOp( } /** + * Creates a HBase {@link SingleColumnValueFilter} with specified column. + * @param column Column which value to be filtered. + * @param value Value to be filtered. + * @param op Compare operator + * @return a {@link SingleColumnValueFilter} object + * @throws IOException + */ + public static Filter createHBaseSingleColValueFilter(Column column, + Object value, CompareOp op) throws IOException { + Filter singleColValFilter = createHBaseSingleColValueFilter( + column.getColumnFamilyBytes(), column.getColumnQualifierBytes(), + column.getValueConverter().encodeValue(value), op, true); + return singleColValFilter; + } + + /** * Creates a HBase {@link SingleColumnValueFilter}. * * @param columnFamily Column Family represented as bytes. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java index d7c1552..64c1407 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java @@ -125,8 +125,8 @@ TimelineEntity getEntity(TimelineReaderContext context, *
  • flowRunId - Context flow run id.
  • *
  • appId - Context app id.
  • * - * Although entityId is also part of context, it has no meaning for - * getEntities.
    + * EntityId and EntityIdPrefix are used when pagination mode is enabled + * using filter fromId.
    * Fields in context which are mandatory depends on entity type. Entity * type is always mandatory. In addition to entity type, below is the list * of context fields which are mandatory, based on entity type.
    @@ -161,8 +161,9 @@ TimelineEntity getEntity(TimelineReaderContext context, * {@link TimelineDataToRetrieve} for details. * @return A set of TimelineEntity instances of the given entity * type in the given context scope which matches the given predicates - * ordered by created time, descending. Each entity will only contain the - * metadata(id, type and created time) plus the given fields to retrieve. + * ordered by enitityIdPrefix, descending. Each entity will only contain + * the metadata(id, type , idPrefix and created time) plus the given + * fields to retrieve. *
    * If entityType is YARN_FLOW_ACTIVITY, entities returned are of type * FlowActivityEntity.
    diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java index 10aeec4..a8f1d0c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java @@ -33,13 +33,13 @@ private final Long flowRunId; private final String appId; private final String entityType; - private final long entityIdPrefix; + private final Long entityIdPrefix; private final String entityId; private final KeyConverter entityRowKeyConverter = new EntityRowKeyConverter(); public EntityRowKey(String clusterId, String userId, String flowName, - Long flowRunId, String appId, String entityType, long entityIdPrefix, + Long flowRunId, String appId, String entityType, Long entityIdPrefix, String entityId) { this.clusterId = clusterId; this.userId = userId; @@ -79,7 +79,7 @@ public String getEntityId() { return entityId; } - public long getEntityIdPrefix() { + public Long getEntityIdPrefix() { return entityIdPrefix; } @@ -180,14 +180,24 @@ private EntityRowKeyConverter() { Separator.encode(rowKey.getEntityType(), Separator.SPACE, Separator.TAB, Separator.QUALIFIERS); + if (rowKey.getEntityIdPrefix() == null) { + return Separator.QUALIFIERS.join(first, second, third, entityType, + Separator.EMPTY_BYTES); + } + byte[] enitityIdPrefix = Bytes.toBytes(rowKey.getEntityIdPrefix()); - byte[] entityId = - rowKey.getEntityId() == null ? Separator.EMPTY_BYTES : Separator - .encode(rowKey.getEntityId(), Separator.SPACE, Separator.TAB, - Separator.QUALIFIERS); + if (rowKey.getEntityId() == null) { + return Separator.QUALIFIERS.join(first, second, third, entityType, + enitityIdPrefix, Separator.EMPTY_BYTES); + } + + byte[] entityId = Separator.encode(rowKey.getEntityId(), Separator.SPACE, + Separator.TAB, Separator.QUALIFIERS); + byte[] fourth = Separator.QUALIFIERS.join(entityType, enitityIdPrefix, entityId); + return Separator.QUALIFIERS.join(first, second, third, fourth); } @@ -227,7 +237,7 @@ public EntityRowKey decode(byte[] rowKey) { Separator.decode(Bytes.toString(rowKeyComponents[5]), Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); - long entityPrefixId = Bytes.toLong(rowKeyComponents[6]); + Long entityPrefixId = Bytes.toLong(rowKeyComponents[6]); String entityId = Separator.decode(Bytes.toString(rowKeyComponents[7]), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java index ef717c0..067a91a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java @@ -31,18 +31,21 @@ * Creates a prefix which generates the following rowKeyPrefixes for the * entity table: * {@code userName!clusterId!flowName!flowRunId!AppId!entityType!}. - * + * * @param clusterId identifying the cluster * @param userId identifying the user * @param flowName identifying the flow * @param flowRunId identifying the individual run of this flow * @param appId identifying the application * @param entityType which entity type + * @param entityIdPrefix for entityId + * @param entityId for an entity */ public EntityRowKeyPrefix(String clusterId, String userId, String flowName, - Long flowRunId, String appId, String entityType) { - // TODO YARN-5585, change prefix id from 0L - super(clusterId, userId, flowName, flowRunId, appId, entityType, 0L, null); + Long flowRunId, String appId, String entityType, Long entityIdPrefix, + String entityId) { + super(clusterId, userId, flowName, flowRunId, appId, entityType, + entityIdPrefix, entityId); } /** @@ -58,8 +61,7 @@ public EntityRowKeyPrefix(String clusterId, String userId, String flowName, */ public EntityRowKeyPrefix(String clusterId, String userId, String flowName, Long flowRunId, String appId) { - // TODO YARN-5585, change prefix id from 0L - super(clusterId, userId, flowName, flowRunId, appId, null, 0L, null); + this(clusterId, userId, flowName, flowRunId, appId, null, null, null); } /* diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java index 42a6aa8..1667f61 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java @@ -64,7 +64,7 @@ public ApplicationEntityReader(TimelineReaderContext ctxt, TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { - super(ctxt, entityFilters, toRetrieve, true); + super(ctxt, entityFilters, toRetrieve); } public ApplicationEntityReader(TimelineReaderContext ctxt, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java index 9ba5e38..c741d0e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java @@ -60,7 +60,7 @@ public FlowActivityEntityReader(TimelineReaderContext ctxt, TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { - super(ctxt, entityFilters, toRetrieve, true); + super(ctxt, entityFilters, toRetrieve); } public FlowActivityEntityReader(TimelineReaderContext ctxt, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java index 986a28f..9b8482c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java @@ -63,7 +63,7 @@ public FlowRunEntityReader(TimelineReaderContext ctxt, TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { - super(ctxt, entityFilters, toRetrieve, true); + super(ctxt, entityFilters, toRetrieve); } public FlowRunEntityReader(TimelineReaderContext ctxt, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java index 0b3f7df..e09d661 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.EnumSet; +import java.util.Iterator; import java.util.Map; import java.util.Set; @@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FamilyFilter; import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -43,6 +45,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; @@ -70,9 +73,8 @@ new StringKeyConverter(); public GenericEntityReader(TimelineReaderContext ctxt, - TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve, - boolean sortedKeys) { - super(ctxt, entityFilters, toRetrieve, sortedKeys); + TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { + super(ctxt, entityFilters, toRetrieve); } public GenericEntityReader(TimelineReaderContext ctxt, @@ -424,18 +426,45 @@ protected void augmentParams(Configuration hbaseConf, Connection conn) protected Result getResult(Configuration hbaseConf, Connection conn, FilterList filterList) throws IOException { TimelineReaderContext context = getContext(); - byte[] rowKey = - new EntityRowKey(context.getClusterId(), context.getUserId(), - context.getFlowName(), context.getFlowRunId(), context.getAppId(), - // TODO YARN-5585, change prefix id from 0L - context.getEntityType(), 0L, context.getEntityId()).getRowKey(); - - Get get = new Get(rowKey); - get.setMaxVersions(getDataToRetrieve().getMetricsLimit()); - if (filterList != null && !filterList.getFilters().isEmpty()) { - get.setFilter(filterList); + Result result = null; + if (context.getEntityIdPrefix() != null) { + byte[] rowKey = new EntityRowKey(context.getClusterId(), + context.getUserId(), context.getFlowName(), context.getFlowRunId(), + context.getAppId(), context.getEntityType(), + context.getEntityIdPrefix(), context.getEntityId()).getRowKey(); + + Get get = new Get(rowKey); + get.setMaxVersions(getDataToRetrieve().getMetricsLimit()); + if (filterList != null && !filterList.getFilters().isEmpty()) { + get.setFilter(filterList); + } + result = getTable().getResult(hbaseConf, conn, get); + + } else { + // Prepare of range scan + // create single SingleColumnValueFilter and add to existing filters. + FilterList filter = new FilterList(Operator.MUST_PASS_ALL); + if (filterList != null && !filterList.getFilters().isEmpty()) { + filter.addFilter(filterList); + } + FilterList newFilter = new FilterList(); + newFilter.addFilter(TimelineFilterUtils.createHBaseSingleColValueFilter( + EntityColumn.ID, context.getEntityId(), CompareOp.EQUAL)); + newFilter.addFilter(new PageFilter(2)); + + filter.addFilter(newFilter); + + ResultScanner results = getResults(hbaseConf, conn, filter); + Iterator iterator = results.iterator(); + if (iterator.hasNext()) { + result = iterator.next(); + if (iterator.hasNext()) { + throw new IOException( + "Same entityId is found with multiple idprefix."); + } + } } - return getTable().getResult(hbaseConf, conn, get); + return result; } @Override @@ -445,11 +474,36 @@ protected ResultScanner getResults(Configuration hbaseConf, Connection conn, // and one type Scan scan = new Scan(); TimelineReaderContext context = getContext(); - RowKeyPrefix entityRowKeyPrefix = - new EntityRowKeyPrefix(context.getClusterId(), context.getUserId(), - context.getFlowName(), context.getFlowRunId(), context.getAppId(), - context.getEntityType()); - scan.setRowPrefixFilter(entityRowKeyPrefix.getRowKeyPrefix()); + RowKeyPrefix entityRowKeyPrefix = null; + // default mode, will always scans from beginning of entity type. + if (getFilters() == null || getFilters().getFromIdPrefix() == null) { + entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(), + context.getUserId(), context.getFlowName(), context.getFlowRunId(), + context.getAppId(), context.getEntityType(), null, null); + scan.setRowPrefixFilter(entityRowKeyPrefix.getRowKeyPrefix()); + } else { // pagination mode, will scan from given entityPrefix!enitityId + entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(), + context.getUserId(), context.getFlowName(), context.getFlowRunId(), + context.getAppId(), context.getEntityType(), + getFilters().getFromIdPrefix(), getFilters().getFromId()); + + // set start row + scan.setStartRow(entityRowKeyPrefix.getRowKeyPrefix()); + + // get the bytes for stop row + entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(), + context.getUserId(), context.getFlowName(), context.getFlowRunId(), + context.getAppId(), context.getEntityType(), null, null); + + // set stop row + scan.setStopRow( + HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix( + entityRowKeyPrefix.getRowKeyPrefix())); + + // set page filter to limit. This filter has to set only in pagination + // mode. + filterList.addFilter(new PageFilter(getFilters().getLimit())); + } scan.setMaxVersions(getDataToRetrieve().getMetricsLimit()); if (filterList != null && !filterList.getFilters().isEmpty()) { scan.setFilter(filterList); @@ -463,10 +517,10 @@ protected TimelineEntity parseEntity(Result result) throws IOException { return null; } TimelineEntity entity = new TimelineEntity(); - String entityType = EntityColumn.TYPE.readResult(result).toString(); - entity.setType(entityType); - String entityId = EntityColumn.ID.readResult(result).toString(); - entity.setId(entityId); + EntityRowKey parseRowKey = EntityRowKey.parseRowKey(result.getRow()); + entity.setType(parseRowKey.getEntityType()); + entity.setId(parseRowKey.getEntityId()); + entity.setIdPrefix(parseRowKey.getEntityIdPrefix().longValue()); TimelineEntityFilters filters = getFilters(); // fetch created time diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java index 8e3a357..9fd39a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java @@ -21,6 +21,7 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.Map; import java.util.NavigableMap; import java.util.NavigableSet; @@ -75,14 +76,6 @@ private BaseTable table; /** - * Specifies whether keys for this table are sorted in a manner where entities - * can be retrieved by created time. If true, it will be sufficient to collect - * the first results as specified by the limit. Otherwise all matched entities - * will be fetched and then limit applied. - */ - private boolean sortedKeys = false; - - /** * Used to convert strings key components to and from storage format. */ private final KeyConverter stringKeyConverter = @@ -95,15 +88,11 @@ * made. * @param entityFilters Filters which limit the entities returned. * @param toRetrieve Data to retrieve for each entity. - * @param sortedKeys Specifies whether key for this table are sorted or not. - * If sorted, entities can be retrieved by created time. */ protected TimelineEntityReader(TimelineReaderContext ctxt, - TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve, - boolean sortedKeys) { + TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { super(ctxt); this.singleEntityRead = false; - this.sortedKeys = sortedKeys; this.dataToRetrieve = toRetrieve; this.filters = entityFilters; @@ -245,7 +234,7 @@ public TimelineEntity readEntity(Configuration hbaseConf, Connection conn) validateParams(); augmentParams(hbaseConf, conn); - NavigableSet entities = new TreeSet<>(); + Set entities = new LinkedHashSet<>(); FilterList filterList = createFilterList(); if (LOG.isDebugEnabled() && filterList != null) { LOG.debug("FilterList created for scan is - " + filterList); @@ -258,14 +247,8 @@ public TimelineEntity readEntity(Configuration hbaseConf, Connection conn) continue; } entities.add(entity); - if (!sortedKeys) { - if (entities.size() > filters.getLimit()) { - entities.pollLast(); - } - } else { - if (entities.size() == filters.getLimit()) { - break; - } + if (entities.size() == filters.getLimit()) { + break; } } return entities; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java index e90338e..16fffa4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java @@ -83,7 +83,7 @@ public static TimelineEntityReader createMultipleEntitiesReader( return new FlowRunEntityReader(context, filters, dataToRetrieve); } else { // assume we're dealing with a generic entity read - return new GenericEntityReader(context, filters, dataToRetrieve, false); + return new GenericEntityReader(context, filters, dataToRetrieve); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesUtils.java index b2837c2..1043368 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.timelineservice.reader; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineUIDConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineUIDConverter.java index d5e791b..11dc913 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineUIDConverter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineUIDConverter.java @@ -53,19 +53,19 @@ public void testUIDEncodingDecoding() throws Exception { assertEquals(context, TimelineUIDConverter.APPLICATION_UID.decodeUID(uid)); context = new TimelineReaderContext("yarn_cluster", "root", "hive_join", - 1234L, "application_1111111111_1111", "YARN_CONTAINER", + 1234L, "application_1111111111_1111", "YARN_CONTAINER", 12345L, "container_1111111111_1111_01_000001"); uid = TimelineUIDConverter.GENERIC_ENTITY_UID.encodeUID(context); assertEquals("yarn_cluster!root!hive_join!1234!application_1111111111_1111!" - + "YARN_CONTAINER!container_1111111111_1111_01_000001", uid); + + "YARN_CONTAINER!12345!container_1111111111_1111_01_000001", uid); assertEquals( context, TimelineUIDConverter.GENERIC_ENTITY_UID.decodeUID(uid)); context = new TimelineReaderContext("yarn_cluster", null, null, null, - "application_1111111111_1111", "YARN_CONTAINER", + "application_1111111111_1111", "YARN_CONTAINER", 54321L, "container_1111111111_1111_01_000001"); uid = TimelineUIDConverter.GENERIC_ENTITY_UID.encodeUID(context); assertEquals("yarn_cluster!application_1111111111_1111!YARN_CONTAINER!" + - "container_1111111111_1111_01_000001", uid); + "54321!container_1111111111_1111_01_000001", uid); assertEquals( context, TimelineUIDConverter.GENERIC_ENTITY_UID.decodeUID(uid)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java index 6c6d1b3..7560f33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java @@ -139,6 +139,7 @@ public void testEntityRowKey() { TimelineEntity entity = new TimelineEntity(); entity.setId("!ent!ity!!id!"); entity.setType("entity!Type"); + entity.setIdPrefix(54321); byte[] byteRowKey = new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID, @@ -151,11 +152,13 @@ public void testEntityRowKey() { assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId()); assertEquals(APPLICATION_ID, rowKey.getAppId()); assertEquals(entity.getType(), rowKey.getEntityType()); + assertEquals(entity.getIdPrefix(), rowKey.getEntityIdPrefix().longValue()); assertEquals(entity.getId(), rowKey.getEntityId()); byte[] byteRowKeyPrefix = new EntityRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, - APPLICATION_ID, entity.getType()).getRowKeyPrefix(); + APPLICATION_ID, entity.getType(), null, null) + .getRowKeyPrefix(); byte[][] splits = Separator.QUALIFIERS.split( byteRowKeyPrefix, @@ -163,8 +166,7 @@ public void testEntityRowKey() { Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE }); - assertEquals(8, splits.length); - assertEquals(entity.getIdPrefix(), splits[7].length); + assertEquals(7, splits.length); assertEquals(APPLICATION_ID, new AppIdKeyConverter().decode(splits[4])); assertEquals(entity.getType(), Separator.QUALIFIERS.decode(Bytes.toString(splits[5])));