diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java index 9c0a983..a9d530b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java @@ -145,6 +145,7 @@ public boolean equals(Object obj) { private HashMap> isRelatedToEntities = new HashMap<>(); private HashMap> relatesToEntities = new HashMap<>(); private Long createdTime; + private Long idPrefix; public TimelineEntity() { identifier = new Identifier(); @@ -581,4 +582,22 @@ public String toString() { return real.toString(); } } + + @XmlElement(name = "idprefix") + public Long getIdPrefix() { + if (real == null) { + return idPrefix; + } else { + return real.getIdPrefix(); + } + } + + @JsonSetter("idprefix") + public void setIdPrefix(Long entityIdPrefix) { + if (real == null) { + this.idPrefix = entityIdPrefix; + } else { + real.setIdPrefix(entityIdPrefix); + } + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntityFilters.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntityFilters.java index 8f2b725..a3eba86 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntityFilters.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntityFilters.java @@ -115,6 +115,7 @@ private TimelineFilterList eventFilters; private static final long DEFAULT_BEGIN_TIME = 0L; private static final long DEFAULT_END_TIME = Long.MAX_VALUE; + private String fromId; /** * Default limit of number of entities to return for getEntities API. @@ -239,4 +240,12 @@ public TimelineFilterList getEventFilters() { public void setEventFilters(TimelineFilterList filters) { this.eventFilters = filters; } + + public String getFromId() { + return fromId; + } + + public void setFromId(String fromId) { + this.fromId = fromId; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderContext.java index 633bb23..c8a96cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderContext.java @@ -31,6 +31,7 @@ private String entityType; private String entityId; + private Long entityIdPrefix; public TimelineReaderContext(String clusterId, String userId, String flowName, Long flowRunId, String appId, String entityType, String entityId) { super(clusterId, userId, flowName, flowRunId, appId); @@ -38,10 +39,19 @@ public TimelineReaderContext(String clusterId, String userId, String flowName, this.entityId = entityId; } + public TimelineReaderContext(String clusterId, String userId, String flowName, + Long flowRunId, String appId, String entityType, Long entityIdPrefix, + String entityId) { + super(clusterId, userId, flowName, flowRunId, appId); + this.entityType = entityType; + this.entityId = entityId; + this.entityIdPrefix = entityIdPrefix; + } + public TimelineReaderContext(TimelineReaderContext other) { this(other.getClusterId(), other.getUserId(), other.getFlowName(), other.getFlowRunId(), other.getAppId(), other.getEntityType(), - other.getEntityId()); + other.getEntityIdPrefix(), other.getEntityId()); } @Override @@ -95,4 +105,12 @@ public String getEntityId() { public void setEntityId(String id) { this.entityId = id; } + + public Long getEntityIdPrefix() { + return entityIdPrefix; + } + + public void setEntityIdPrefix(Long entityIdPrefix) { + this.entityIdPrefix = entityIdPrefix; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java index fcab78c..da1129b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java @@ -559,14 +559,26 @@ public TimelineAbout about( TimelineReaderManager timelineReaderManager = getTimelineReaderManager(); Set entities = null; try { - entities = timelineReaderManager.getEntities( - TimelineReaderWebServicesUtils.createTimelineReaderContext( - clusterId, userId, flowName, flowRunId, appId, entityType, null), - TimelineReaderWebServicesUtils.createTimelineEntityFilters( - limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, - infofilters, conffilters, metricfilters, eventfilters), + // TODO need to add as query params + String entityIdPrefix = null; + + TimelineReaderContext timelineReaderContext = + TimelineReaderWebServicesUtils.createTimelineReaderContext(clusterId, + userId, flowName, flowRunId, appId, entityType, entityIdPrefix, + null); + + TimelineEntityFilters timelineEntityFilters = + TimelineReaderWebServicesUtils.createTimelineEntityFilters(limit, + createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, + infofilters, conffilters, metricfilters, eventfilters); + + // TODO need to add as query params + timelineEntityFilters.setFromId(null); + + entities = timelineReaderManager.getEntities(timelineReaderContext, + timelineEntityFilters, TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( - confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); + confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); } catch (Exception e) { handleException(e, url, startTime, "createdTime start/end or limit or flowrunid"); @@ -816,7 +828,8 @@ public TimelineEntity getEntity( try { entity = timelineReaderManager.getEntity( TimelineReaderWebServicesUtils.createTimelineReaderContext( - clusterId, userId, flowName, flowRunId, appId, entityType, entityId), + clusterId, userId, flowName, flowRunId, appId, entityType, null, + entityId), TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); } catch (Exception e) { @@ -992,7 +1005,7 @@ public TimelineEntity getFlowRun( entity = timelineReaderManager.getEntity( TimelineReaderWebServicesUtils.createTimelineReaderContext( clusterId, userId, flowName, flowRunId, null, - TimelineEntityType.YARN_FLOW_RUN.toString(), null), + TimelineEntityType.YARN_FLOW_RUN.toString(), null, null), TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( null, metricsToRetrieve, null, null)); } catch (Exception e) { @@ -1217,7 +1230,7 @@ public TimelineEntity getFlowRun( entities = timelineReaderManager.getEntities( TimelineReaderWebServicesUtils.createTimelineReaderContext( clusterId, userId, flowName, null, null, - TimelineEntityType.YARN_FLOW_RUN.toString(), null), + TimelineEntityType.YARN_FLOW_RUN.toString(), null, null), TimelineReaderWebServicesUtils.createTimelineEntityFilters( limit, createdTimeStart, createdTimeEnd, null, null, null, null, null, null), @@ -1345,7 +1358,7 @@ public TimelineEntity getFlowRun( entities = timelineReaderManager.getEntities( TimelineReaderWebServicesUtils.createTimelineReaderContext( clusterId, null, null, null, null, - TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null), + TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null, null), entityFilters, TimelineReaderWebServicesUtils. createTimelineDataToRetrieve(null, null, null, null)); } catch (Exception e) { @@ -1584,7 +1597,7 @@ public TimelineEntity getApp( entity = timelineReaderManager.getEntity( TimelineReaderWebServicesUtils.createTimelineReaderContext( clusterId, userId, flowName, flowRunId, appId, - TimelineEntityType.YARN_APPLICATION.toString(), null), + TimelineEntityType.YARN_APPLICATION.toString(), null, null), TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); } catch (Exception e) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServicesUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServicesUtils.java index 7fc8cb8..fcc761c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServicesUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServicesUtils.java @@ -49,10 +49,10 @@ private TimelineReaderWebServicesUtils() { */ static TimelineReaderContext createTimelineReaderContext(String clusterId, String userId, String flowName, String flowRunId, String appId, - String entityType, String entityId) { + String entityType, String entityIdPrefix, String entityId) { return new TimelineReaderContext(parseStr(clusterId), parseStr(userId), parseStr(flowName), parseLongStr(flowRunId), parseStr(appId), - parseStr(entityType), parseStr(entityId)); + parseStr(entityType), parseLongStr(entityIdPrefix), parseStr(entityId)); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index 3511a2f..306b6f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -159,7 +159,7 @@ public TimelineWriteResponse write(String clusterId, String userId, } else { EntityRowKey entityRowKey = new EntityRowKey(clusterId, userId, flowName, flowRunId, appId, - te.getType(), te.getId()); + te.getType(), te.getIdPrefix(), te.getId()); rowKey = entityRowKey.getRowKey(); } @@ -364,6 +364,7 @@ private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion, EntityColumn.TYPE.store(rowKey, entityTable, null, te.getType()); EntityColumn.CREATED_TIME.store(rowKey, entityTable, null, te.getCreatedTime()); + EntityColumn.ID_PREFIX.store(rowKey, entityTable, null, te.getIdPrefix()); EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion); Map info = te.getInfo(); if (info != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java index 93b4b36..c475635 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java @@ -52,6 +52,11 @@ CREATED_TIME(EntityColumnFamily.INFO, "created_time", new LongConverter()), /** + * prefix for sorting row-keys. + */ + ID_PREFIX(EntityColumnFamily.INFO, "id_prefix", new LongConverter()), + + /** * The version of the flow that this entity belongs to. */ FLOW_VERSION(EntityColumnFamily.INFO, "flow_version"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java index ff22178..37f1e16 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java @@ -33,18 +33,21 @@ private final Long flowRunId; private final String appId; private final String entityType; + private final Long entityIdPrefix; private final String entityId; private final KeyConverter entityRowKeyConverter = new EntityRowKeyConverter(); public EntityRowKey(String clusterId, String userId, String flowName, - Long flowRunId, String appId, String entityType, String entityId) { + Long flowRunId, String appId, String entityType, Long entityIdPrefix, + String entityId) { this.clusterId = clusterId; this.userId = userId; this.flowName = flowName; this.flowRunId = flowRunId; this.appId = appId; this.entityType = entityType; + this.entityIdPrefix = entityIdPrefix; this.entityId = entityId; } @@ -76,6 +79,10 @@ public String getEntityId() { return entityId; } + public Long getEntityIdPrefix() { + return entityIdPrefix; + } + /** * Constructs a row key for the entity table as follows: * {@code userName!clusterId!flowName!flowRunId!AppId!entityType!entityId}. @@ -126,7 +133,7 @@ private EntityRowKeyConverter() { private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE, - Separator.VARIABLE_SIZE }; + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE }; /* * (non-Javadoc) @@ -172,11 +179,21 @@ private EntityRowKeyConverter() { byte[] entityType = Separator.encode(rowKey.getEntityType(), Separator.SPACE, Separator.TAB, Separator.QUALIFIERS); + + // Note that entityIdPrefix is a long, so we can't encode them all at the + // same time. It can not be null. If null, go with empty_bytes + byte[] enitityIdPrefix = Separator.EMPTY_BYTES; + if (rowKey.getEntityIdPrefix() != null) { + enitityIdPrefix = + Bytes.toBytes(LongConverter.invertLong(rowKey.getEntityIdPrefix())); + } + byte[] entityId = rowKey.getEntityId() == null ? Separator.EMPTY_BYTES : Separator .encode(rowKey.getEntityId(), Separator.SPACE, Separator.TAB, Separator.QUALIFIERS); - byte[] fourth = Separator.QUALIFIERS.join(entityType, entityId); + byte[] fourth = + Separator.QUALIFIERS.join(entityType, enitityIdPrefix, entityId); return Separator.QUALIFIERS.join(first, second, third, fourth); } @@ -196,7 +213,7 @@ private EntityRowKeyConverter() { public EntityRowKey decode(byte[] rowKey) { byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES); - if (rowKeyComponents.length != 7) { + if (rowKeyComponents.length != 8) { throw new IllegalArgumentException("the row key is not valid for " + "an entity"); } @@ -215,11 +232,18 @@ public EntityRowKey decode(byte[] rowKey) { String entityType = Separator.decode(Bytes.toString(rowKeyComponents[5]), Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + + Long entityPrefixId = null; + if (rowKeyComponents[6].length != 0) { + entityPrefixId = + LongConverter.invertLong(Bytes.toLong(rowKeyComponents[6])); + } + String entityId = - Separator.decode(Bytes.toString(rowKeyComponents[6]), + Separator.decode(Bytes.toString(rowKeyComponents[7]), Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); return new EntityRowKey(clusterId, userId, flowName, flowRunId, appId, - entityType, entityId); + entityType, entityPrefixId, entityId); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java index 9146180..0f5d39d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java @@ -41,7 +41,14 @@ */ public EntityRowKeyPrefix(String clusterId, String userId, String flowName, Long flowRunId, String appId, String entityType) { - super(clusterId, userId, flowName, flowRunId, appId, entityType, null); + super(clusterId, userId, flowName, flowRunId, appId, entityType, null, + null); + } + + public EntityRowKeyPrefix(String clusterId, String userId, String flowName, + Long flowRunId, String appId, String entityType, Long entityIdPrefix) { + super(clusterId, userId, flowName, flowRunId, appId, entityType, + entityIdPrefix, null); } /** @@ -57,7 +64,7 @@ public EntityRowKeyPrefix(String clusterId, String userId, String flowName, */ public EntityRowKeyPrefix(String clusterId, String userId, String flowName, Long flowRunId, String appId) { - super(clusterId, userId, flowName, flowRunId, appId, null, null); + super(clusterId, userId, flowName, flowRunId, appId, null, null, null); } /* diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java index aa2bfda..fbc6d20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java @@ -65,7 +65,7 @@ public ApplicationEntityReader(TimelineReaderContext ctxt, TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { - super(ctxt, entityFilters, toRetrieve, true); + super(ctxt, entityFilters, toRetrieve); } public ApplicationEntityReader(TimelineReaderContext ctxt, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java index 9ba5e38..c741d0e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java @@ -60,7 +60,7 @@ public FlowActivityEntityReader(TimelineReaderContext ctxt, TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { - super(ctxt, entityFilters, toRetrieve, true); + super(ctxt, entityFilters, toRetrieve); } public FlowActivityEntityReader(TimelineReaderContext ctxt, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java index 986a28f..9b8482c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java @@ -63,7 +63,7 @@ public FlowRunEntityReader(TimelineReaderContext ctxt, TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { - super(ctxt, entityFilters, toRetrieve, true); + super(ctxt, entityFilters, toRetrieve); } public FlowRunEntityReader(TimelineReaderContext ctxt, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java index 4e1ab8a..cf21b5d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java @@ -18,11 +18,13 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader; import java.io.IOException; +import java.util.Arrays; import java.util.EnumSet; import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; @@ -79,9 +81,8 @@ new StringKeyConverter(); public GenericEntityReader(TimelineReaderContext ctxt, - TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve, - boolean sortedKeys) { - super(ctxt, entityFilters, toRetrieve, sortedKeys); + TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { + super(ctxt, entityFilters, toRetrieve); } public GenericEntityReader(TimelineReaderContext ctxt, @@ -502,7 +503,8 @@ protected Result getResult(Configuration hbaseConf, Connection conn, byte[] rowKey = new EntityRowKey(context.getClusterId(), context.getUserId(), context.getFlowName(), context.getFlowRunId(), context.getAppId(), - context.getEntityType(), context.getEntityId()).getRowKey(); + context.getEntityType(), context.getEntityIdPrefix(), + context.getEntityId()).getRowKey(); Get get = new Get(rowKey); get.setMaxVersions(getDataToRetrieve().getMetricsLimit()); if (filterList != null && !filterList.getFilters().isEmpty()) { @@ -518,11 +520,31 @@ protected ResultScanner getResults(Configuration hbaseConf, Connection conn, // and one type Scan scan = new Scan(); TimelineReaderContext context = getContext(); - RowKeyPrefix entityRowKeyPrefix = - new EntityRowKeyPrefix(context.getClusterId(), context.getUserId(), - context.getFlowName(), context.getFlowRunId(), context.getAppId(), - context.getEntityType()); - scan.setRowPrefixFilter(entityRowKeyPrefix.getRowKeyPrefix()); + RowKeyPrefix entityRowKeyPrefix = null; + if (context.getEntityIdPrefix() == null) { + entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(), + context.getUserId(), context.getFlowName(), context.getFlowRunId(), + context.getAppId(), context.getEntityType()); + scan.setRowPrefixFilter(entityRowKeyPrefix.getRowKeyPrefix()); + } else { + entityRowKeyPrefix = + new EntityRowKeyPrefix(context.getClusterId(), context.getUserId(), + context.getFlowName(), context.getFlowRunId(), context.getAppId(), + context.getEntityType(), context.getEntityIdPrefix()); + + // set start row + scan.setStartRow(entityRowKeyPrefix.getRowKeyPrefix()); + + // get the bytes for stop row + entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(), + context.getUserId(), context.getFlowName(), context.getFlowRunId(), + context.getAppId(), context.getEntityType()); + + // set stop row + scan.setStopRow(calculateTheClosestNextRowKeyForPrefix( + entityRowKeyPrefix.getRowKeyPrefix())); + } + scan.setMaxVersions(getDataToRetrieve().getMetricsLimit()); if (filterList != null && !filterList.getFilters().isEmpty()) { scan.setFilter(filterList); @@ -645,4 +667,30 @@ protected TimelineEntity parseEntity(Result result) throws IOException { entity.addInfo(columns); } } + + private byte[] calculateTheClosestNextRowKeyForPrefix(byte[] rowKeyPrefix) { + // Essentially we are treating it like an 'unsigned very very long' and + // doing +1 manually. + // Search for the place where the trailing 0xFFs start + int offset = rowKeyPrefix.length; + while (offset > 0) { + if (rowKeyPrefix[offset - 1] != (byte) 0xFF) { + break; + } + offset--; + } + + if (offset == 0) { + // We got an 0xFFFF... (only FFs) stopRow value which is + // the last possible prefix before the end of the table. + // So set it to stop at the 'end of the table' + return HConstants.EMPTY_END_ROW; + } + + // Copy the right length of the original + byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset); + // And increment the last one + newStopRow[newStopRow.length - 1]++; + return newStopRow; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java index 7b294a8..7ac7db5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java @@ -21,6 +21,7 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.Map; import java.util.NavigableMap; import java.util.NavigableSet; @@ -75,14 +76,6 @@ private BaseTable table; /** - * Specifies whether keys for this table are sorted in a manner where entities - * can be retrieved by created time. If true, it will be sufficient to collect - * the first results as specified by the limit. Otherwise all matched entities - * will be fetched and then limit applied. - */ - private boolean sortedKeys = false; - - /** * Used to convert strings key components to and from storage format. */ private final KeyConverter stringKeyConverter = @@ -99,10 +92,8 @@ * If sorted, entities can be retrieved by created time. */ protected TimelineEntityReader(TimelineReaderContext ctxt, - TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve, - boolean sortedKeys) { + TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { this.singleEntityRead = false; - this.sortedKeys = sortedKeys; this.context = ctxt; this.dataToRetrieve = toRetrieve; this.filters = entityFilters; @@ -249,27 +240,29 @@ public TimelineEntity readEntity(Configuration hbaseConf, Connection conn) validateParams(); augmentParams(hbaseConf, conn); - NavigableSet entities = new TreeSet<>(); + Set entities = new LinkedHashSet<>(); FilterList filterList = createFilterList(); if (LOG.isDebugEnabled() && filterList != null) { LOG.debug("FilterList created for scan is - " + filterList); } ResultScanner results = getResults(hbaseConf, conn, filterList); try { + String fromId = filters.getFromId(); + boolean isFound = false; for (Result result : results) { TimelineEntity entity = parseEntity(result); if (entity == null) { continue; } + + if (!isFound && fromId != null) { + isFound = fromId.equals(entity.getId()); + continue; + } + entities.add(entity); - if (!sortedKeys) { - if (entities.size() > filters.getLimit()) { - entities.pollLast(); - } - } else { - if (entities.size() == filters.getLimit()) { - break; - } + if (entities.size() == filters.getLimit()) { + break; } } return entities; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java index b2a9476..ea28eca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java @@ -83,7 +83,7 @@ public static TimelineEntityReader createMultipleEntitiesReader( return new FlowRunEntityReader(context, filters, dataToRetrieve); } else { // assume we're dealing with a generic entity read - return new GenericEntityReader(context, filters, dataToRetrieve, false); + return new GenericEntityReader(context, filters, dataToRetrieve); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java index 368b060..509205b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java @@ -139,7 +139,7 @@ public void testEntityRowKey() { String entityType = "entity!Type"; byte[] byteRowKey = new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID, - entityType, entityId).getRowKey(); + entityType, null, entityId).getRowKey(); EntityRowKey rowKey = EntityRowKey.parseRowKey(byteRowKey); assertEquals(CLUSTER, rowKey.getClusterId()); assertEquals(USER, rowKey.getUserId()); @@ -158,9 +158,9 @@ public void testEntityRowKey() { new int[] {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE, - Separator.VARIABLE_SIZE}); - assertEquals(7, splits.length); - assertEquals(0, splits[6].length); + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE }); + assertEquals(8, splits.length); + assertEquals(0, splits[7].length); assertEquals(APPLICATION_ID, new AppIdKeyConverter().decode(splits[4])); assertEquals(entityType, Separator.QUALIFIERS.decode(Bytes.toString(splits[5])));