diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index e48ca60..6bda25e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -141,6 +141,13 @@ private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion) EntityColumn.MODIFIED_TIME.store(rowKey, entityTable, null, te.getModifiedTime()); EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion); + Map info = te.getInfo(); + if (info != null) { + for (Map.Entry entry : info.entrySet()) { + EntityColumnPrefix.INFO.store(rowKey, entityTable, entry.getKey(), + null, entry.getValue()); + } + } } /** @@ -186,6 +193,10 @@ private void storeEvents(byte[] rowKey, Set events) if (event != null) { String eventId = event.getId(); if (eventId != null) { + Long eventTimestamp = event.getTimestamp(); + if (eventTimestamp == null) { + eventTimestamp = System.currentTimeMillis(); + } Map eventInfo = event.getInfo(); if (eventInfo != null) { for (Map.Entry info : eventInfo.entrySet()) { @@ -198,8 +209,8 @@ private void storeEvents(byte[] rowKey, Set events) // convert back to string to avoid additional API on store. String compoundColumnQualifier = Bytes.toString(compoundColumnQualifierBytes); - EntityColumnPrefix.METRIC.store(rowKey, entityTable, - compoundColumnQualifier, null, info.getValue()); + EntityColumnPrefix.EVENT.store(rowKey, entityTable, + compoundColumnQualifier, eventTimestamp, info.getValue()); } // for info: eventInfo } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java index 4459868..b9f003e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java @@ -45,6 +45,11 @@ RELATES_TO(EntityColumnFamily.INFO, "r"), /** + * To store TimelineEntity getIsRelatedToEntities values. + */ + INFO(EntityColumnFamily.INFO, "i"), + + /** * Lifecycle events for an entity */ EVENT(EntityColumnFamily.INFO, "e"), @@ -92,7 +97,7 @@ private EntityColumnPrefix(ColumnFamily columnFamily, /** * @return the column name value */ - private String getColumnPrefix() { + public String getColumnPrefix() { return columnPrefix; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java index 6abf240..0a79c45 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java @@ -43,8 +43,10 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; @@ -84,6 +86,12 @@ public void testWriteEntityToHBase() throws Exception { entity.setCreatedTime(cTime); entity.setModifiedTime(mTime); + // add the info map in Timeline Entity + Map infoMap = new HashMap(); + infoMap.put("infoMapKey1", "infoMapValue1"); + infoMap.put("infoMapKey2", 10); + entity.addInfo(infoMap); + // add the isRelatedToEntity info String key = "task"; String value = "is_related_to_entity_id_here"; @@ -177,6 +185,14 @@ public void testWriteEntityToHBase() throws Exception { Long mTime1 = val.longValue(); assertEquals(mTime1, mTime); + Map infoColumns = + EntityColumnPrefix.INFO.readResults(result); + assertEquals(infoMap.size(), infoColumns.size()); + for (String infoItem : infoMap.keySet()) { + assertEquals(infoMap.get(infoItem), + infoColumns.get(infoItem)); + } + // Remember isRelatedTo is of type Map> for (String isRelatedToKey : isRelatedTo.keySet()) { Object isRelatedToValue = @@ -237,7 +253,7 @@ public void testWriteEntityToHBase() throws Exception { } } assertEquals(1, rowCount); - assertEquals(15, colCount); + assertEquals(17, colCount); } finally { hbi.stop(); @@ -268,12 +284,15 @@ private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user, private void testAdditionalEntity() throws IOException { TimelineEvent event = new TimelineEvent(); event.setId("foo_event_id"); - event.setTimestamp(System.currentTimeMillis()); - event.addInfo("foo_event", "test"); + Long expTs = 1436512802000L; + event.setTimestamp(expTs); + Object expVal = "test"; + event.addInfo("foo_event", expVal); final TimelineEntity entity = new TimelineEntity(); entity.setId("attempt_1329348432655_0001_m_000008_18"); entity.setType("FOO_ATTEMPT"); + entity.addEvent(event); TimelineEntities entities = new TimelineEntities(); entities.addEntity(entity); @@ -304,6 +323,39 @@ private void testAdditionalEntity() throws IOException { for (Result result : scanner) { if (result != null && !result.isEmpty()) { rowCount++; + NavigableMap>> resultMap = result.getMap(); + NavigableMap> columnCellMap = resultMap + .get(EntityColumnFamily.INFO.getBytes()); + for (Map.Entry> entry : columnCellMap + .entrySet()) { + byte[] qualifierb = entry.getKey(); + String qualifier = Bytes.toString(qualifierb); + if (qualifier != null) { + // since there is only one event, check for that exact event id + if (qualifier.startsWith(EntityColumnPrefix.EVENT + .getColumnPrefix())) { + // TODO + // should have a read method to read events + // just like there are methods to read config and timeseries + // data + // in EntityColumnPrefix.EVENT.readX + // the following code that asserts the column qualifier and + // values + // should be updated to use such a read method + assertTrue(qualifier.equals((EntityColumnPrefix.EVENT + .getColumnPrefix() + "!foo_event_id?foo_event"))); + NavigableMap valueM = entry.getValue(); + assertNotNull(valueM); + // only one event, hence confirm size to be 1 + assertEquals(1, valueM.size()); + Long ts = valueM.firstKey(); + assertEquals(expTs, ts); + byte[] val = valueM.get(ts); + assertEquals(expVal, GenericObjectMapper.read(val)); + } + } + } } } assertEquals(1, rowCount);