diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java index 9c0a983..19fdda4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java @@ -145,6 +145,10 @@ public boolean equals(Object obj) { private HashMap> isRelatedToEntities = new HashMap<>(); private HashMap> relatesToEntities = new HashMap<>(); private Long createdTime; + // default value is zero. Note : if this value is changed for any specific + // entity then timeline clients MUST provide the same prefix for all + // subsequent updates of the same entity. + private long idPrefix = 0L; public TimelineEntity() { identifier = new Identifier(); @@ -581,4 +585,22 @@ public String toString() { return real.toString(); } } + + @XmlElement(name = "idprefix") + public long getIdPrefix() { + if (real == null) { + return idPrefix; + } else { + return real.getIdPrefix(); + } + } + + @JsonSetter("idprefix") + public void setIdPrefix(long entityIdPrefix) { + if (real == null) { + this.idPrefix = entityIdPrefix; + } else { + real.setIdPrefix(entityIdPrefix); + } + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderContext.java index 633bb23..c8a96cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderContext.java @@ -31,6 +31,7 @@ private String entityType; private String entityId; + private Long entityIdPrefix; public TimelineReaderContext(String clusterId, String userId, String flowName, Long flowRunId, String appId, String entityType, String entityId) { super(clusterId, userId, flowName, flowRunId, appId); @@ -38,10 +39,19 @@ public TimelineReaderContext(String clusterId, String userId, String flowName, this.entityId = entityId; } + public TimelineReaderContext(String clusterId, String userId, String flowName, + Long flowRunId, String appId, String entityType, Long entityIdPrefix, + String entityId) { + super(clusterId, userId, flowName, flowRunId, appId); + this.entityType = entityType; + this.entityId = entityId; + this.entityIdPrefix = entityIdPrefix; + } + public TimelineReaderContext(TimelineReaderContext other) { this(other.getClusterId(), other.getUserId(), other.getFlowName(), other.getFlowRunId(), other.getAppId(), other.getEntityType(), - other.getEntityId()); + other.getEntityIdPrefix(), other.getEntityId()); } @Override @@ -95,4 +105,12 @@ public String getEntityId() { public void setEntityId(String id) { this.entityId = id; } + + public Long getEntityIdPrefix() { + return entityIdPrefix; + } + + public void setEntityIdPrefix(Long entityIdPrefix) { + this.entityIdPrefix = entityIdPrefix; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java index 1f527f2..ddfd095 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java @@ -85,9 +85,10 @@ private synchronized void write(String clusterId, String userId, throws IOException { PrintWriter out = null; try { + long idPrefix = entity.getIdPrefix(); String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId, escape(flowName), escape(flowVersion), String.valueOf(flowRun), appId, - entity.getType()); + entity.getType(), String.valueOf(idPrefix)); String fileName = dir + entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION; out = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index c0d1fba..a8b44f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -159,7 +159,7 @@ public TimelineWriteResponse write(String clusterId, String userId, } else { EntityRowKey entityRowKey = new EntityRowKey(clusterId, userId, flowName, flowRunId, appId, - te.getType(), te.getId()); + te.getType(), te.getIdPrefix(), te.getId()); rowKey = entityRowKey.getRowKey(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java index ff22178..8b35906 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java @@ -33,18 +33,21 @@ private final Long flowRunId; private final String appId; private final String entityType; + private final long entityIdPrefix; private final String entityId; private final KeyConverter entityRowKeyConverter = new EntityRowKeyConverter(); public EntityRowKey(String clusterId, String userId, String flowName, - Long flowRunId, String appId, String entityType, String entityId) { + Long flowRunId, String appId, String entityType, long entityIdPrefix, + String entityId) { this.clusterId = clusterId; this.userId = userId; this.flowName = flowName; this.flowRunId = flowRunId; this.appId = appId; this.entityType = entityType; + this.entityIdPrefix = entityIdPrefix; this.entityId = entityId; } @@ -76,6 +79,10 @@ public String getEntityId() { return entityId; } + public long getEntityIdPrefix() { + return entityIdPrefix; + } + /** * Constructs a row key for the entity table as follows: * {@code userName!clusterId!flowName!flowRunId!AppId!entityType!entityId}. @@ -126,7 +133,7 @@ private EntityRowKeyConverter() { private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE, - Separator.VARIABLE_SIZE }; + Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE }; /* * (non-Javadoc) @@ -172,11 +179,15 @@ private EntityRowKeyConverter() { byte[] entityType = Separator.encode(rowKey.getEntityType(), Separator.SPACE, Separator.TAB, Separator.QUALIFIERS); + + byte[] enitityIdPrefix = Bytes.toBytes(rowKey.getEntityIdPrefix()); + byte[] entityId = rowKey.getEntityId() == null ? Separator.EMPTY_BYTES : Separator .encode(rowKey.getEntityId(), Separator.SPACE, Separator.TAB, Separator.QUALIFIERS); - byte[] fourth = Separator.QUALIFIERS.join(entityType, entityId); + byte[] fourth = + Separator.QUALIFIERS.join(entityType, enitityIdPrefix, entityId); return Separator.QUALIFIERS.join(first, second, third, fourth); } @@ -196,7 +207,7 @@ private EntityRowKeyConverter() { public EntityRowKey decode(byte[] rowKey) { byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES); - if (rowKeyComponents.length != 7) { + if (rowKeyComponents.length != 8) { throw new IllegalArgumentException("the row key is not valid for " + "an entity"); } @@ -215,11 +226,14 @@ public EntityRowKey decode(byte[] rowKey) { String entityType = Separator.decode(Bytes.toString(rowKeyComponents[5]), Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + + Long entityPrefixId = Bytes.toLong(rowKeyComponents[6]); + String entityId = - Separator.decode(Bytes.toString(rowKeyComponents[6]), + Separator.decode(Bytes.toString(rowKeyComponents[7]), Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); return new EntityRowKey(clusterId, userId, flowName, flowRunId, appId, - entityType, entityId); + entityType, entityPrefixId, entityId); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java index 9146180..f0bbf53 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java @@ -41,7 +41,14 @@ */ public EntityRowKeyPrefix(String clusterId, String userId, String flowName, Long flowRunId, String appId, String entityType) { - super(clusterId, userId, flowName, flowRunId, appId, entityType, null); + super(clusterId, userId, flowName, flowRunId, appId, entityType, 0L, + null); + } + + public EntityRowKeyPrefix(String clusterId, String userId, String flowName, + Long flowRunId, String appId, String entityType, Long entityIdPrefix) { + super(clusterId, userId, flowName, flowRunId, appId, entityType, + entityIdPrefix, null); } /** @@ -57,7 +64,7 @@ public EntityRowKeyPrefix(String clusterId, String userId, String flowName, */ public EntityRowKeyPrefix(String clusterId, String userId, String flowName, Long flowRunId, String appId) { - super(clusterId, userId, flowName, flowRunId, appId, null, null); + super(clusterId, userId, flowName, flowRunId, appId, null, 0L, null); } /* diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java index b194f07..027c8d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java @@ -50,8 +50,8 @@ * | flowRunId! | | | configKey2: | * | AppId! | created_time: | metricId1: | configValue2 | * | entityType!| 1392993084018 | metricValue2 | | - * | entityId | | @timestamp2 | | - * | | i!infoKey: | | | + * | idPrefix! | | @timestamp2 | | + * | entityId | i!infoKey: | | | * | | infoValue | metricId1: | | * | | | metricValue1 | | * | | r!relatesToKey: | @timestamp2 | | diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java index 4e1ab8a..1e78a18 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java @@ -502,7 +502,9 @@ protected Result getResult(Configuration hbaseConf, Connection conn, byte[] rowKey = new EntityRowKey(context.getClusterId(), context.getUserId(), context.getFlowName(), context.getFlowRunId(), context.getAppId(), - context.getEntityType(), context.getEntityId()).getRowKey(); + // TODO YARN-5585, change prefix id from 0L + context.getEntityType(), 0L, context.getEntityId()).getRowKey(); + Get get = new Get(rowKey); get.setMaxVersions(getDataToRetrieve().getMetricsLimit()); if (filterList != null && !filterList.getFilters().isEmpty()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java index 4f12c57..8a67aac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java @@ -96,8 +96,8 @@ public void testWriteEntityToFile() throws Exception { File.separator + "cluster_id" + File.separator + "user_id" + File.separator + "flow_name" + File.separator + "flow_version" + File.separator + "12345678" + File.separator + "app_id" + - File.separator + type + File.separator + id + - FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + File.separator + type + File.separator + "0" + File.separator + id + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; Path path = Paths.get(fileName); File f = new File(fileName); assertTrue(f.exists() && !f.isDirectory()); @@ -113,8 +113,8 @@ public void testWriteEntityToFile() throws Exception { File.separator + "cluster_id" + File.separator + "user_id" + File.separator + "flow_name" + File.separator + "flow_version" + File.separator + "12345678" + File.separator + "app_id" + - File.separator + type2 + File.separator + id2 + - FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + File.separator + type2 + File.separator + "0" + File.separator + id2 + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; Path path2 = Paths.get(fileName2); File file = new File(fileName2); assertTrue(file.exists() && !file.isDirectory()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java index 368b060..776b5e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java @@ -137,9 +137,10 @@ public void testAppToFlowRowKey() { public void testEntityRowKey() { String entityId = "!ent!ity!!id!"; String entityType = "entity!Type"; + long entityIdPrefix = 0; byte[] byteRowKey = new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID, - entityType, entityId).getRowKey(); + entityType, entityIdPrefix, entityId).getRowKey(); EntityRowKey rowKey = EntityRowKey.parseRowKey(byteRowKey); assertEquals(CLUSTER, rowKey.getClusterId()); assertEquals(USER, rowKey.getUserId()); @@ -158,9 +159,9 @@ public void testEntityRowKey() { new int[] {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE, - Separator.VARIABLE_SIZE}); - assertEquals(7, splits.length); - assertEquals(0, splits[6].length); + Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE }); + assertEquals(8, splits.length); + assertEquals(0, splits[7].length); assertEquals(APPLICATION_ID, new AppIdKeyConverter().decode(splits[4])); assertEquals(entityType, Separator.QUALIFIERS.decode(Bytes.toString(splits[5])));