diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderContext.java index 633bb23..31fd0e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderContext.java @@ -31,6 +31,7 @@ private String entityType; private String entityId; + private long entityIdPrefix; public TimelineReaderContext(String clusterId, String userId, String flowName, Long flowRunId, String appId, String entityType, String entityId) { super(clusterId, userId, flowName, flowRunId, appId); @@ -38,10 +39,17 @@ public TimelineReaderContext(String clusterId, String userId, String flowName, this.entityId = entityId; } + public TimelineReaderContext(String clusterId, String userId, String flowName, + Long flowRunId, String appId, String entityType, long entityIdPrefix, + String entityId) { + this(clusterId, userId, flowName, flowRunId, appId, entityType, entityId); + this.entityIdPrefix = entityIdPrefix; + } + public TimelineReaderContext(TimelineReaderContext other) { this(other.getClusterId(), other.getUserId(), other.getFlowName(), other.getFlowRunId(), other.getAppId(), other.getEntityType(), - other.getEntityId()); + other.getEntityIdPrefix(), other.getEntityId()); } @Override @@ -95,4 +103,12 @@ public String getEntityId() { public void setEntityId(String id) { this.entityId = id; } + + public long getEntityIdPrefix() { + return entityIdPrefix; + } + + public void setEntityIdPrefix(long entityIdPrefix) { + this.entityIdPrefix = entityIdPrefix; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java index db0c4e1..6ac9e29 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java @@ -561,7 +561,8 @@ public TimelineAbout about( try { entities = timelineReaderManager.getEntities( TimelineReaderWebServicesUtils.createTimelineReaderContext( - clusterId, userId, flowName, flowRunId, appId, entityType, null), + clusterId, userId, flowName, flowRunId, appId, entityType, + 0L + "", null), TimelineReaderWebServicesUtils.createTimelineEntityFilters( limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters), @@ -816,7 +817,8 @@ public TimelineEntity getEntity( try { entity = timelineReaderManager.getEntity( TimelineReaderWebServicesUtils.createTimelineReaderContext( - clusterId, userId, flowName, flowRunId, appId, entityType, entityId), + clusterId, userId, flowName, flowRunId, appId, entityType, + 0L + "", entityId), TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); } catch (Exception e) { @@ -992,7 +994,7 @@ public TimelineEntity getFlowRun( entity = timelineReaderManager.getEntity( TimelineReaderWebServicesUtils.createTimelineReaderContext( clusterId, userId, flowName, flowRunId, null, - TimelineEntityType.YARN_FLOW_RUN.toString(), null), + TimelineEntityType.YARN_FLOW_RUN.toString(), 0L + "", null), TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( null, metricsToRetrieve, null, null)); } catch (Exception e) { @@ -1217,7 +1219,7 @@ public TimelineEntity getFlowRun( entities = timelineReaderManager.getEntities( TimelineReaderWebServicesUtils.createTimelineReaderContext( clusterId, userId, flowName, null, null, - TimelineEntityType.YARN_FLOW_RUN.toString(), null), + TimelineEntityType.YARN_FLOW_RUN.toString(), 0L + "", null), TimelineReaderWebServicesUtils.createTimelineEntityFilters( limit, createdTimeStart, createdTimeEnd, null, null, null, null, null, null), @@ -1345,7 +1347,7 @@ public TimelineEntity getFlowRun( entities = timelineReaderManager.getEntities( TimelineReaderWebServicesUtils.createTimelineReaderContext( clusterId, null, null, null, null, - TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null), + TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 0L + "", null), entityFilters, TimelineReaderWebServicesUtils. createTimelineDataToRetrieve(null, null, null, null)); } catch (Exception e) { @@ -1584,7 +1586,7 @@ public TimelineEntity getApp( entity = timelineReaderManager.getEntity( TimelineReaderWebServicesUtils.createTimelineReaderContext( clusterId, userId, flowName, flowRunId, appId, - TimelineEntityType.YARN_APPLICATION.toString(), null), + TimelineEntityType.YARN_APPLICATION.toString(), 0L + "", null), TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); } catch (Exception e) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServicesUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServicesUtils.java index 7fc8cb8..fcc761c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServicesUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServicesUtils.java @@ -49,10 +49,10 @@ private TimelineReaderWebServicesUtils() { */ static TimelineReaderContext createTimelineReaderContext(String clusterId, String userId, String flowName, String flowRunId, String appId, - String entityType, String entityId) { + String entityType, String entityIdPrefix, String entityId) { return new TimelineReaderContext(parseStr(clusterId), parseStr(userId), parseStr(flowName), parseLongStr(flowRunId), parseStr(appId), - parseStr(entityType), parseStr(entityId)); + parseStr(entityType), parseLongStr(entityIdPrefix), parseStr(entityId)); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineUIDConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineUIDConverter.java index 08e5405..a8d2d18 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineUIDConverter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineUIDConverter.java @@ -155,12 +155,16 @@ String encodeUID(TimelineReaderContext context) { // Flow information exists. String[] entityTupleArr = {context.getClusterId(), context.getUserId(), context.getFlowName(), context.getFlowRunId().toString(), - context.getAppId(), context.getEntityType(), context.getEntityId()}; + context.getAppId(), context.getEntityType(), + String.valueOf(context.getEntityIdPrefix()), + context.getEntityId() }; return joinAndEscapeUIDParts(entityTupleArr); } else { // Only entity and app information exists. Flow info does not exist. String[] entityTupleArr = {context.getClusterId(), context.getAppId(), - context.getEntityType(), context.getEntityId()}; + context.getEntityType(), + String.valueOf(context.getEntityIdPrefix()), + context.getEntityId() }; return joinAndEscapeUIDParts(entityTupleArr); } } @@ -171,20 +175,21 @@ TimelineReaderContext decodeUID(String uId) throws Exception { return null; } List entityTupleList = splitUID(uId); - // Should have 7 parts i.e. cluster, user, flow name, flowrun id, app id, - // entity type and entity id OR should have 4 parts i.e. cluster, app id, - // entity type and entity id. - if (entityTupleList.size() == 7) { + // Should have 8 parts i.e. cluster, user, flow name, flowrun id, app id, + // entity type, entity idPrefix and entity id OR should have 5 parts i.e. + // cluster, app id, entity type, entity idPrefix and entity id. + if (entityTupleList.size() == 8) { // Flow information exists. return new TimelineReaderContext(entityTupleList.get(0), entityTupleList.get(1), entityTupleList.get(2), Long.parseLong(entityTupleList.get(3)), entityTupleList.get(4), - entityTupleList.get(5), entityTupleList.get(6)); - } else if (entityTupleList.size() == 4) { + entityTupleList.get(5), Long.parseLong(entityTupleList.get(6)), + entityTupleList.get(7)); + } else if (entityTupleList.size() == 5) { // Flow information does not exist. return new TimelineReaderContext(entityTupleList.get(0), null, null, null, entityTupleList.get(1), entityTupleList.get(2), - entityTupleList.get(3)); + Long.parseLong(entityTupleList.get(3)), entityTupleList.get(4)); } else { return null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java index ef717c0..7a9173d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java @@ -41,10 +41,16 @@ */ public EntityRowKeyPrefix(String clusterId, String userId, String flowName, Long flowRunId, String appId, String entityType) { - // TODO YARN-5585, change prefix id from 0L super(clusterId, userId, flowName, flowRunId, appId, entityType, 0L, null); } + public EntityRowKeyPrefix(String clusterId, String userId, String flowName, + Long flowRunId, String appId, String entityType, long entityIdPrefix, + String entityId) { + super(clusterId, userId, flowName, flowRunId, appId, entityType, + entityIdPrefix, entityId); + } + /** * Creates a prefix which generates the following rowKeyPrefixes for the * entity table: @@ -58,7 +64,6 @@ public EntityRowKeyPrefix(String clusterId, String userId, String flowName, */ public EntityRowKeyPrefix(String clusterId, String userId, String flowName, Long flowRunId, String appId) { - // TODO YARN-5585, change prefix id from 0L super(clusterId, userId, flowName, flowRunId, appId, null, 0L, null); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java index 9ba5e38..c741d0e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java @@ -60,7 +60,7 @@ public FlowActivityEntityReader(TimelineReaderContext ctxt, TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { - super(ctxt, entityFilters, toRetrieve, true); + super(ctxt, entityFilters, toRetrieve); } public FlowActivityEntityReader(TimelineReaderContext ctxt, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java index 986a28f..9b8482c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java @@ -63,7 +63,7 @@ public FlowRunEntityReader(TimelineReaderContext ctxt, TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { - super(ctxt, entityFilters, toRetrieve, true); + super(ctxt, entityFilters, toRetrieve); } public FlowRunEntityReader(TimelineReaderContext ctxt, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java index 1e78a18..d03341b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java @@ -18,11 +18,13 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader; import java.io.IOException; +import java.util.Arrays; import java.util.EnumSet; import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; @@ -81,7 +83,7 @@ public GenericEntityReader(TimelineReaderContext ctxt, TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve, boolean sortedKeys) { - super(ctxt, entityFilters, toRetrieve, sortedKeys); + super(ctxt, entityFilters, toRetrieve); } public GenericEntityReader(TimelineReaderContext ctxt, @@ -520,11 +522,32 @@ protected ResultScanner getResults(Configuration hbaseConf, Connection conn, // and one type Scan scan = new Scan(); TimelineReaderContext context = getContext(); - RowKeyPrefix entityRowKeyPrefix = - new EntityRowKeyPrefix(context.getClusterId(), context.getUserId(), - context.getFlowName(), context.getFlowRunId(), context.getAppId(), - context.getEntityType()); - scan.setRowPrefixFilter(entityRowKeyPrefix.getRowKeyPrefix()); + RowKeyPrefix entityRowKeyPrefix = null; + // default mode, will always scans from beginning of entity type. + if (context.getEntityId() == null) { + entityRowKeyPrefix = new EntityRowKeyPrefix( + context.getClusterId(), context.getUserId(), context.getFlowName(), + context.getFlowRunId(), context.getAppId(), context.getEntityType()); + scan.setRowPrefixFilter(entityRowKeyPrefix.getRowKeyPrefix()); + } else { // pagination mode, will scan from given entityPrefix!enitityId + entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(), + context.getUserId(), context.getFlowName(), context.getFlowRunId(), + context.getAppId(), context.getEntityType(), + context.getEntityIdPrefix(), context.getEntityId()); + + // set start row + scan.setStartRow(entityRowKeyPrefix.getRowKeyPrefix()); + + // get the bytes for stop row + entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(), + context.getUserId(), context.getFlowName(), context.getFlowRunId(), + context.getAppId(), context.getEntityType()); + + // set stop row + scan.setStopRow(calculateTheClosestNextRowKeyForPrefix( + entityRowKeyPrefix.getRowKeyPrefix())); + } + scan.setMaxVersions(getDataToRetrieve().getMetricsLimit()); if (filterList != null && !filterList.getFilters().isEmpty()) { scan.setFilter(filterList); @@ -538,10 +561,10 @@ protected TimelineEntity parseEntity(Result result) throws IOException { return null; } TimelineEntity entity = new TimelineEntity(); - String entityType = EntityColumn.TYPE.readResult(result).toString(); - entity.setType(entityType); - String entityId = EntityColumn.ID.readResult(result).toString(); - entity.setId(entityId); + EntityRowKey parseRowKey = EntityRowKey.parseRowKey(result.getRow()); + entity.setType(parseRowKey.getEntityType()); + entity.setId(parseRowKey.getEntityId()); + entity.setIdPrefix(parseRowKey.getEntityIdPrefix()); TimelineEntityFilters filters = getFilters(); // fetch created time @@ -647,4 +670,30 @@ protected TimelineEntity parseEntity(Result result) throws IOException { entity.addInfo(columns); } } + + private byte[] calculateTheClosestNextRowKeyForPrefix(byte[] rowKeyPrefix) { + // Essentially we are treating it like an 'unsigned very very long' and + // doing +1 manually. + // Search for the place where the trailing 0xFFs start + int offset = rowKeyPrefix.length; + while (offset > 0) { + if (rowKeyPrefix[offset - 1] != (byte) 0xFF) { + break; + } + offset--; + } + + if (offset == 0) { + // We got an 0xFFFF... (only FFs) stopRow value which is + // the last possible prefix before the end of the table. + // So set it to stop at the 'end of the table' + return HConstants.EMPTY_END_ROW; + } + + // Copy the right length of the original + byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset); + // And increment the last one + newStopRow[newStopRow.length - 1]++; + return newStopRow; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java index 7b294a8..3134312 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java @@ -21,11 +21,10 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.Map; import java.util.NavigableMap; -import java.util.NavigableSet; import java.util.Set; -import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -75,14 +74,6 @@ private BaseTable table; /** - * Specifies whether keys for this table are sorted in a manner where entities - * can be retrieved by created time. If true, it will be sufficient to collect - * the first results as specified by the limit. Otherwise all matched entities - * will be fetched and then limit applied. - */ - private boolean sortedKeys = false; - - /** * Used to convert strings key components to and from storage format. */ private final KeyConverter stringKeyConverter = @@ -99,10 +90,8 @@ * If sorted, entities can be retrieved by created time. */ protected TimelineEntityReader(TimelineReaderContext ctxt, - TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve, - boolean sortedKeys) { + TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { this.singleEntityRead = false; - this.sortedKeys = sortedKeys; this.context = ctxt; this.dataToRetrieve = toRetrieve; this.filters = entityFilters; @@ -249,7 +238,7 @@ public TimelineEntity readEntity(Configuration hbaseConf, Connection conn) validateParams(); augmentParams(hbaseConf, conn); - NavigableSet entities = new TreeSet<>(); + Set entities = new LinkedHashSet<>(); FilterList filterList = createFilterList(); if (LOG.isDebugEnabled() && filterList != null) { LOG.debug("FilterList created for scan is - " + filterList); @@ -262,14 +251,8 @@ public TimelineEntity readEntity(Configuration hbaseConf, Connection conn) continue; } entities.add(entity); - if (!sortedKeys) { - if (entities.size() > filters.getLimit()) { - entities.pollLast(); - } - } else { - if (entities.size() == filters.getLimit()) { - break; - } + if (entities.size() == filters.getLimit()) { + break; } } return entities;