diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEvent.java index 1dbf7e5..a563658 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEvent.java @@ -33,6 +33,8 @@ @InterfaceAudience.Public @InterfaceStability.Unstable public class TimelineEvent implements Comparable { + public static final long INVALID_TIMESTAMP = 0L; + private String id; private HashMap info = new HashMap<>(); private long timestamp; @@ -83,7 +85,7 @@ public void setTimestamp(long timestamp) { } public boolean isValid() { - return (id != null && timestamp != 0L); + return (id != null && timestamp != INVALID_TIMESTAMP); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index e48ca60..12fffa9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -141,6 +141,13 @@ private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion) EntityColumn.MODIFIED_TIME.store(rowKey, entityTable, null, te.getModifiedTime()); EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion); + Map info = te.getInfo(); + if (info != null) { + for (Map.Entry entry : info.entrySet()) { + EntityColumnPrefix.INFO.store(rowKey, entityTable, entry.getKey(), + null, entry.getValue()); + } + } } /** @@ -186,6 +193,13 @@ private void storeEvents(byte[] rowKey, Set events) if (event != null) { String eventId = event.getId(); if (eventId != null) { + long eventTimestamp = event.getTimestamp(); + // if the timestamp is not set, use the current timestamp + if (eventTimestamp == TimelineEvent.INVALID_TIMESTAMP) { + LOG.warn("timestamp is not set for event " + eventId + + "! Using the current timestamp"); + eventTimestamp = System.currentTimeMillis(); + } Map eventInfo = event.getInfo(); if (eventInfo != null) { for (Map.Entry info : eventInfo.entrySet()) { @@ -198,8 +212,8 @@ private void storeEvents(byte[] rowKey, Set events) // convert back to string to avoid additional API on store. String compoundColumnQualifier = Bytes.toString(compoundColumnQualifierBytes); - EntityColumnPrefix.METRIC.store(rowKey, entityTable, - compoundColumnQualifier, null, info.getValue()); + EntityColumnPrefix.EVENT.store(rowKey, entityTable, + compoundColumnQualifier, eventTimestamp, info.getValue()); } // for info: eventInfo } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java index 6a204dc..10537a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java @@ -116,16 +116,19 @@ public Object readResult(Result result, byte[] columnQualifierBytes) * @param result from which to reads timeseries data * @param columnPrefixBytes optional prefix to limit columns. If null all * columns are returned. + * @param the type of the time series values. The values will be case into + * that type. * @return the cell values at each respective time in for form * {idA={timestamp1->value1}, idA={timestamp2->value2}, * idB={timestamp3->value3}, idC={timestamp1->value4}} * @throws IOException */ - public NavigableMap> readTimeseriesResults( + @SuppressWarnings("unchecked") + public NavigableMap> readTimeseriesResults( Result result, byte[] columnPrefixBytes) throws IOException { - NavigableMap> results = - new TreeMap>(); + NavigableMap> results = + new TreeMap>(); if (result != null) { NavigableMap>> resultMap = @@ -157,13 +160,13 @@ public Object readResult(Result result, byte[] columnQualifierBytes) // If this column has the prefix we want if (columnName != null) { - NavigableMap cellResults = - new TreeMap(); + NavigableMap cellResults = + new TreeMap(); NavigableMap cells = entry.getValue(); if (cells != null) { for (Entry cell : cells.entrySet()) { - Number value = - (Number) GenericObjectMapper.read(cell.getValue()); + T value = + (T) GenericObjectMapper.read(cell.getValue()); cellResults.put(cell.getKey(), value); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java index 2eedea0..b6b559b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java @@ -73,11 +73,13 @@ public void store(byte[] rowKey, TypedBufferedMutator tableMutator, /** * @param result from which to reads timeseries data + * @param the type of the time series values. The values will be case into + * that type. * @return the cell values at each respective time in for form * {idA={timestamp1->value1}, idA={timestamp2->value2}, * idB={timestamp3->value3}, idC={timestamp1->value4}} * @throws IOException */ - public NavigableMap> readTimeseriesResults( + public NavigableMap> readTimeseriesResults( Result result) throws IOException; } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java index ee57890..3319419 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java @@ -90,6 +90,13 @@ private Separator(String value, String encodedValue) { } /** + * @return the original value of the separator + */ + public String getValue() { + return value; + } + + /** * Used to make token safe to be used with this separator without collisions. * * @param token diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java index 4459868..fe8e6b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java @@ -45,6 +45,11 @@ RELATES_TO(EntityColumnFamily.INFO, "r"), /** + * To store TimelineEntity info values. + */ + INFO(EntityColumnFamily.INFO, "i"), + + /** * Lifecycle events for an entity */ EVENT(EntityColumnFamily.INFO, "e"), @@ -92,7 +97,7 @@ private EntityColumnPrefix(ColumnFamily columnFamily, /** * @return the column name value */ - private String getColumnPrefix() { + public String getColumnPrefix() { return columnPrefix; } @@ -152,7 +157,7 @@ public Object readResult(Result result, String qualifier) throws IOException { * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix * #readTimeseriesResults(org.apache.hadoop.hbase.client.Result) */ - public NavigableMap> readTimeseriesResults( + public NavigableMap> readTimeseriesResults( Result result) throws IOException { return column.readTimeseriesResults(result, columnPrefixBytes); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java index 6abf240..c4ae311 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java @@ -43,8 +43,10 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; @@ -84,6 +86,12 @@ public void testWriteEntityToHBase() throws Exception { entity.setCreatedTime(cTime); entity.setModifiedTime(mTime); + // add the info map in Timeline Entity + Map infoMap = new HashMap(); + infoMap.put("infoMapKey1", "infoMapValue1"); + infoMap.put("infoMapKey2", 10); + entity.addInfo(infoMap); + // add the isRelatedToEntity info String key = "task"; String value = "is_related_to_entity_id_here"; @@ -177,6 +185,14 @@ public void testWriteEntityToHBase() throws Exception { Long mTime1 = val.longValue(); assertEquals(mTime1, mTime); + Map infoColumns = + EntityColumnPrefix.INFO.readResults(result); + assertEquals(infoMap.size(), infoColumns.size()); + for (String infoItem : infoMap.keySet()) { + assertEquals(infoMap.get(infoItem), + infoColumns.get(infoItem)); + } + // Remember isRelatedTo is of type Map> for (String isRelatedToKey : isRelatedTo.keySet()) { Object isRelatedToValue = @@ -237,7 +253,7 @@ public void testWriteEntityToHBase() throws Exception { } } assertEquals(1, rowCount); - assertEquals(15, colCount); + assertEquals(17, colCount); } finally { hbi.stop(); @@ -267,13 +283,18 @@ private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user, private void testAdditionalEntity() throws IOException { TimelineEvent event = new TimelineEvent(); - event.setId("foo_event_id"); - event.setTimestamp(System.currentTimeMillis()); - event.addInfo("foo_event", "test"); + String eventId = "foo_event_id"; + event.setId(eventId); + Long expTs = 1436512802000L; + event.setTimestamp(expTs); + String expKey = "foo_event"; + Object expVal = "test"; + event.addInfo(expKey, expVal); final TimelineEntity entity = new TimelineEntity(); entity.setId("attempt_1329348432655_0001_m_000008_18"); entity.setType("FOO_ATTEMPT"); + entity.addEvent(event); TimelineEntities entities = new TimelineEntities(); entities.addEntity(entity); @@ -304,6 +325,31 @@ private void testAdditionalEntity() throws IOException { for (Result result : scanner) { if (result != null && !result.isEmpty()) { rowCount++; + + // check the row key + byte[] row1 = result.getRow(); + assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName, + entity)); + + // check the events + NavigableMap> eventsResult = + EntityColumnPrefix.EVENT.readTimeseriesResults(result); + // there should be only one event + assertEquals(1, eventsResult.size()); + // key name for the event + String valueKey = eventId + Separator.VALUES.getValue() + expKey; + for (Map.Entry> e : + eventsResult.entrySet()) { + // the value key must match + assertEquals(valueKey, e.getKey()); + NavigableMap value = e.getValue(); + // there should be only one timestamp and value + assertEquals(1, value.size()); + for (Map.Entry e2: value.entrySet()) { + assertEquals(expTs, e2.getKey()); + assertEquals(expVal, e2.getValue()); + } + } } } assertEquals(1, rowCount);