diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java index 094f868..6478818 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java @@ -19,12 +19,9 @@ import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.Map; import java.util.NavigableMap; import java.util.NavigableSet; @@ -431,44 +428,51 @@ private static TimelineEntity parseEntity( Map columns = prefix.readResults(result); if (isConfig) { for (Map.Entry column : columns.entrySet()) { - entity.addConfig(column.getKey(), column.getKey().toString()); + entity.addConfig(column.getKey(), column.getValue().toString()); } } else { entity.addInfo(columns); } } + /** + * Read events from the entity table or the application table. The column name + * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted + * if there is no info associated with the event. + * + * See {@link EntityTable} and {@link ApplicationTable} for a more detailed + * schema description. + */ private static void readEvents(TimelineEntity entity, Result result, boolean isApplication) throws IOException { Map eventsMap = new HashMap<>(); - Map eventsResult = isApplication ? - ApplicationColumnPrefix.EVENT.readResults(result) : - EntityColumnPrefix.EVENT.readResults(result); - for (Map.Entry eventResult : eventsResult.entrySet()) { - Collection tokens = - Separator.VALUES.splitEncoded(eventResult.getKey()); - if (tokens.size() != 2 && tokens.size() != 3) { - throw new IOException( - "Invalid event column name: " + eventResult.getKey()); - } - Iterator idItr = tokens.iterator(); - String id = idItr.next(); - String tsStr = idItr.next(); - // TODO: timestamp is not correct via ser/des through UTF-8 string - Long ts = - TimelineWriterUtils.invert(Bytes.toLong(tsStr.getBytes( - StandardCharsets.UTF_8))); - String key = Separator.VALUES.joinEncoded(id, ts.toString()); - TimelineEvent event = eventsMap.get(key); - if (event == null) { - event = new TimelineEvent(); - event.setId(id); - event.setTimestamp(ts); - eventsMap.put(key, event); - } - if (tokens.size() == 3) { - String infoKey = idItr.next(); - event.addInfo(infoKey, eventResult.getValue()); + Map eventsResult = isApplication ? + ApplicationColumnPrefix.EVENT. + readResultsHavingCompoundColumnQualifiers(result) : + EntityColumnPrefix.EVENT. + readResultsHavingCompoundColumnQualifiers(result); + for (Map.Entry eventResult : eventsResult.entrySet()) { + byte[][] karr = eventResult.getKey(); + // the column name is of the form "eventId=timestamp=infoKey" + if (karr.length == 3) { + String id = Bytes.toString(karr[0]); + long ts = TimelineWriterUtils.invert(Bytes.toLong(karr[1])); + String key = Separator.VALUES.joinEncoded(id, Long.toString(ts)); + TimelineEvent event = eventsMap.get(key); + if (event == null) { + event = new TimelineEvent(); + event.setId(id); + event.setTimestamp(ts); + eventsMap.put(key, event); + } + // handle empty info + String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]); + if (infoKey != null) { + event.addInfo(infoKey, eventResult.getValue()); + } + } else { + LOG.warn("incorrectly formatted column name: it will be discarded"); + continue; } } Set eventsSet = new HashSet<>(eventsMap.values()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index 96192cc..772002d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -300,25 +300,27 @@ private void storeEvents(byte[] rowKey, Set events, byte[] compoundColumnQualifierBytes = Separator.VALUES.join(columnQualifierWithTsBytes, null); - String compoundColumnQualifier = - Bytes.toString(compoundColumnQualifierBytes); - EntityColumnPrefix.EVENT.store(rowKey, entityTable, - compoundColumnQualifier, null, TimelineWriterUtils.EMPTY_BYTES); + if (isApplication) { + ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable, + compoundColumnQualifierBytes, null, + TimelineWriterUtils.EMPTY_BYTES); + } else { + EntityColumnPrefix.EVENT.store(rowKey, entityTable, + compoundColumnQualifierBytes, null, + TimelineWriterUtils.EMPTY_BYTES); + } } else { for (Map.Entry info : eventInfo.entrySet()) { // eventId?infoKey byte[] compoundColumnQualifierBytes = Separator.VALUES.join(columnQualifierWithTsBytes, Bytes.toBytes(info.getKey())); - // convert back to string to avoid additional API on store. - String compoundColumnQualifier = - Bytes.toString(compoundColumnQualifierBytes); if (isApplication) { ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable, - compoundColumnQualifier, null, info.getValue()); + compoundColumnQualifierBytes, null, info.getValue()); } else { EntityColumnPrefix.EVENT.store(rowKey, entityTable, - compoundColumnQualifier, null, info.getValue()); + compoundColumnQualifierBytes, null, info.getValue()); } } // for info: eventInfo } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java index cd9e845..3821c92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java @@ -111,6 +111,31 @@ private String getColumnPrefix() { * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object) */ public void store(byte[] rowKey, + TypedBufferedMutator tableMutator, byte[] qualifier, + Long timestamp, Object inputValue) throws IOException { + + // Null check + if (qualifier == null) { + throw new IOException("Cannot store column with null qualifier in " + + tableMutator.getName().getNameAsString()); + } + + byte[] columnQualifier = + ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); + + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #store(byte[], + * org.apache.hadoop.yarn.server.timelineservice.storage.common. + * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object) + */ + public void store(byte[] rowKey, TypedBufferedMutator tableMutator, String qualifier, Long timestamp, Object inputValue) throws IOException { @@ -155,6 +180,19 @@ public Object readResult(Result result, String qualifier) throws IOException { * * @see * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResults(org.apache.hadoop.hbase.client.Result) + */ + public Map readResultsHavingCompoundColumnQualifiers( + Result result) throws IOException { + return column.readResultsHavingCompoundColumnQualifiers(result, + columnPrefixBytes); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result) */ public NavigableMap> diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTable.java index d2a2cb9..6ef0e3e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTable.java @@ -62,7 +62,7 @@ * | | s!isRelatedToKey: | | | * | | id7?id9?id6 | | | * | | | | | - * | | e!eventId?timestamp?infoKey: | | | + * | | e!eventId=timestamp=infoKey: | | | * | | eventInfoValue | | | * | | | | | * | | flowVersion: | | | diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java index a902924..ba7e13b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java @@ -207,6 +207,10 @@ public Object readResult(Result result, byte[] columnQualifierBytes) if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes) && columnNameParts.length == 2) { // This is the prefix that we want + // if the column name is a compound qualifier + // with non string datatypes, the following decode will not + // work correctly since it considers all components to be String + // invoke the readResultsHavingCompoundColumnQualifiers function columnName = Separator.decode(columnNameParts[1]); } } @@ -223,6 +227,51 @@ public Object readResult(Result result, byte[] columnQualifierBytes) } /** + * @param result + * from which to read columns + * @param columnPrefixBytes + * optional prefix to limit columns. If null all columns are + * returned. + * @return the latest values of columns in the column family. The column + * qualifier is returned as a list of parts, each part a byte[]. This + * is to facilitate returning byte arrays of values that were not + * Strings + * @throws IOException + */ + public Map readResultsHavingCompoundColumnQualifiers( + Result result, byte[] columnPrefixBytes) throws IOException { + Map results = new HashMap(); + + if (result != null) { + Map columns = result.getFamilyMap(columnFamilyBytes); + for (Entry entry : columns.entrySet()) { + if (entry.getKey() != null && entry.getKey().length > 0) { + // A non-null prefix means columns are actually of the form + // prefix!columnNameRemainder + // with a compound column qualifier, we are presuming existence of a + // prefix + byte[][] columnNameParts = Separator.QUALIFIERS.split(entry.getKey(), + 2); + if (columnNameParts.length > 0) { + byte[] actualColumnPrefixBytes = columnNameParts[0]; + if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes) + && columnNameParts.length == 2) { + // This is the prefix that we want + byte[][] columnQualifierParts = Separator.VALUES.split( + columnNameParts[1], -1); + Object value = GenericObjectMapper.read(entry.getValue()); + // we return the columnQualifier in parts since we don't know which + // part is of which data type + results.put(columnQualifierParts, value); + } + } + } + } // for entry + } + return results; + } + + /** * @param columnPrefixBytes The byte representation for the column prefix. * Should not contain {@link Separator#QUALIFIERS}. * @param qualifier for the remainder of the column. Any @@ -247,4 +296,24 @@ public Object readResult(Result result, byte[] columnQualifierBytes) return columnQualifier; } + /** + * @param columnPrefixBytes The byte representation for the column prefix. + * Should not contain {@link Separator#QUALIFIERS}. + * @param qualifier the byte representation for the remainder of the column. + * @return fully sanitized column qualifier that is a combination of prefix + * and qualifier. If prefix is null, the result is simply the encoded + * qualifier without any separator. + */ + public static byte[] getColumnQualifier(byte[] columnPrefixBytes, + byte[] qualifier) { + + if (columnPrefixBytes == null) { + return qualifier; + } + + byte[] columnQualifier = + Separator.QUALIFIERS.join(columnPrefixBytes, qualifier); + return columnQualifier; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java index 3319419..a3bfdbb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java @@ -37,7 +37,7 @@ /** * separator in values, and/or compound key/column qualifier fields. */ - VALUES("?", "%1$"), + VALUES("=", "%1$"), /** * separator in values, often used to avoid having these in qualifiers and diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java index c8485c0..b1fe3c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java @@ -131,6 +131,31 @@ public void store(byte[] rowKey, * * @see * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #store(byte[], + * org.apache.hadoop.yarn.server.timelineservice.storage.common. + * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object) + */ + public void store(byte[] rowKey, + TypedBufferedMutator tableMutator, byte[] qualifier, + Long timestamp, Object inputValue) throws IOException { + + // Null check + if (qualifier == null) { + throw new IOException("Cannot store column with null qualifier in " + + tableMutator.getName().getNameAsString()); + } + + byte[] columnQualifier = + ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); + + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String) */ public Object readResult(Result result, String qualifier) throws IOException { @@ -155,6 +180,19 @@ public Object readResult(Result result, String qualifier) throws IOException { * * @see * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResults(org.apache.hadoop.hbase.client.Result) + */ + public Map readResultsHavingCompoundColumnQualifiers( + Result result) throws IOException { + return column.readResultsHavingCompoundColumnQualifiers(result, + columnPrefixBytes); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result) */ public NavigableMap> diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java index 9a8bd8c..495c2a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java @@ -63,7 +63,7 @@ * | | s!isRelatedToKey | | | * | | id7?id9?id6 | | | * | | | | | - * | | e!eventId?timestamp?infoKey: | | | + * | | e!eventId=timestamp=infoKey: | | | * | | eventInfoValue | | | * | | | | | * | | flowVersion: | | | diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java index 95f88d1..7c50559 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java @@ -27,8 +27,8 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.Map.Entry; import java.util.NavigableMap; +import java.util.NavigableSet; import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; @@ -101,8 +102,8 @@ public void testWriteApplicationToHBase() throws Exception { ApplicationEntity entity = new ApplicationEntity(); String id = "hello"; entity.setId(id); - Long cTime = 1425016501000L; - Long mTime = 1425026901000L; + long cTime = 1425016501000L; + long mTime = 1425026901000L; entity.setCreatedTime(cTime); entity.setModifiedTime(mTime); @@ -197,19 +198,16 @@ public void testWriteApplicationToHBase() throws Exception { Number val = (Number) ApplicationColumn.CREATED_TIME.readResult(result); - Long cTime1 = val.longValue(); + long cTime1 = val.longValue(); assertEquals(cTime1, cTime); val = (Number) ApplicationColumn.MODIFIED_TIME.readResult(result); - Long mTime1 = val.longValue(); + long mTime1 = val.longValue(); assertEquals(mTime1, mTime); Map infoColumns = ApplicationColumnPrefix.INFO.readResults(result); - assertEquals(infoMap.size(), infoColumns.size()); - for (String infoItem : infoMap.keySet()) { - assertEquals(infoMap.get(infoItem), infoColumns.get(infoItem)); - } + assertEquals(infoMap, infoColumns); // Remember isRelatedTo is of type Map> for (String isRelatedToKey : isRelatedTo.keySet()) { @@ -245,27 +243,15 @@ public void testWriteApplicationToHBase() throws Exception { // Configuration Map configColumns = ApplicationColumnPrefix.CONFIG.readResults(result); - assertEquals(conf.size(), configColumns.size()); - for (String configItem : conf.keySet()) { - assertEquals(conf.get(configItem), configColumns.get(configItem)); - } + assertEquals(conf, configColumns); NavigableMap> metricsResult = ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result); NavigableMap metricMap = metricsResult.get(m1.getId()); - // We got metrics back - assertNotNull(metricMap); - // Same number of metrics as we wrote - assertEquals(metricValues.entrySet().size(), metricMap.entrySet().size()); - - // Iterate over original metrics and confirm that they are present - // here. - for (Entry metricEntry : metricValues.entrySet()) { - assertEquals(metricEntry.getValue(), - metricMap.get(metricEntry.getKey())); - } + assertEquals(metricValues, metricMap); + // read the timeline entity using the reader this time TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, id, entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL)); @@ -274,6 +260,31 @@ public void testWriteApplicationToHBase() throws Exception { null, null, null, null, EnumSet.of(TimelineReader.Field.ALL)); assertNotNull(e1); assertEquals(1, es1.size()); + + // verify attributes + assertEquals(id, e1.getId()); + assertEquals(TimelineEntityType.YARN_APPLICATION.toString(), + e1.getType()); + assertEquals(cTime, e1.getCreatedTime()); + assertEquals(mTime, e1.getModifiedTime()); + Map infoMap2 = e1.getInfo(); + assertEquals(infoMap, infoMap2); + + Map> isRelatedTo2 = e1.getIsRelatedToEntities(); + assertEquals(isRelatedTo, isRelatedTo2); + + Map> relatesTo2 = e1.getRelatesToEntities(); + assertEquals(relatesTo, relatesTo2); + + Map conf2 = e1.getConfigs(); + assertEquals(conf, conf2); + + Set metrics2 = e1.getMetrics(); + assertEquals(metrics, metrics2); + for (TimelineMetric metric2 : metrics2) { + Map metricValues2 = metric2.getValues(); + assertEquals(metricValues, metricValues2); + } } finally { if (hbi != null) { hbi.stop(); @@ -294,8 +305,8 @@ public void testWriteEntityToHBase() throws Exception { String type = "world"; entity.setId(id); entity.setType(type); - Long cTime = 1425016501000L; - Long mTime = 1425026901000L; + long cTime = 1425016501000L; + long mTime = 1425026901000L; entity.setCreatedTime(cTime); entity.setModifiedTime(mTime); @@ -396,20 +407,16 @@ public void testWriteEntityToHBase() throws Exception { assertEquals(type, type1); Number val = (Number) EntityColumn.CREATED_TIME.readResult(result); - Long cTime1 = val.longValue(); + long cTime1 = val.longValue(); assertEquals(cTime1, cTime); val = (Number) EntityColumn.MODIFIED_TIME.readResult(result); - Long mTime1 = val.longValue(); + long mTime1 = val.longValue(); assertEquals(mTime1, mTime); Map infoColumns = EntityColumnPrefix.INFO.readResults(result); - assertEquals(infoMap.size(), infoColumns.size()); - for (String infoItem : infoMap.keySet()) { - assertEquals(infoMap.get(infoItem), - infoColumns.get(infoItem)); - } + assertEquals(infoMap, infoColumns); // Remember isRelatedTo is of type Map> for (String isRelatedToKey : isRelatedTo.keySet()) { @@ -447,32 +454,19 @@ public void testWriteEntityToHBase() throws Exception { // Configuration Map configColumns = EntityColumnPrefix.CONFIG.readResults(result); - assertEquals(conf.size(), configColumns.size()); - for (String configItem : conf.keySet()) { - assertEquals(conf.get(configItem), configColumns.get(configItem)); - } + assertEquals(conf, configColumns); NavigableMap> metricsResult = EntityColumnPrefix.METRIC.readResultsWithTimestamps(result); NavigableMap metricMap = metricsResult.get(m1.getId()); - // We got metrics back - assertNotNull(metricMap); - // Same number of metrics as we wrote - assertEquals(metricValues.entrySet().size(), metricMap.entrySet() - .size()); - - // Iterate over original metrics and confirm that they are present - // here. - for (Entry metricEntry : metricValues.entrySet()) { - assertEquals(metricEntry.getValue(), - metricMap.get(metricEntry.getKey())); - } + assertEquals(metricValues, metricMap); } } assertEquals(1, rowCount); assertEquals(17, colCount); + // read the timeline entity using the reader this time TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName, entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL)); @@ -481,6 +475,30 @@ public void testWriteEntityToHBase() throws Exception { null, null, null, null, EnumSet.of(TimelineReader.Field.ALL)); assertNotNull(e1); assertEquals(1, es1.size()); + + // verify attributes + assertEquals(id, e1.getId()); + assertEquals(type, e1.getType()); + assertEquals(cTime, e1.getCreatedTime()); + assertEquals(mTime, e1.getModifiedTime()); + Map infoMap2 = e1.getInfo(); + assertEquals(infoMap, infoMap2); + + Map> isRelatedTo2 = e1.getIsRelatedToEntities(); + assertEquals(isRelatedTo, isRelatedTo2); + + Map> relatesTo2 = e1.getRelatesToEntities(); + assertEquals(relatesTo, relatesTo2); + + Map conf2 = e1.getConfigs(); + assertEquals(conf, conf2); + + Set metrics2 = e1.getMetrics(); + assertEquals(metrics, metrics2); + for (TimelineMetric metric2 : metrics2) { + Map metricValues2 = metric2.getValues(); + assertEquals(metricValues, metricValues2); + } } finally { if (hbi != null) { hbi.stop(); @@ -494,7 +512,7 @@ public void testWriteEntityToHBase() throws Exception { } private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user, - String flow, Long runid, String appName, TimelineEntity te) { + String flow, long runid, String appName, TimelineEntity te) { byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1); @@ -511,7 +529,7 @@ private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user, } private boolean isApplicationRowKeyCorrect(byte[] rowKey, String cluster, - String user, String flow, Long runid, String appName) { + String user, String flow, long runid, String appName) { byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1); @@ -530,7 +548,7 @@ public void testEvents() throws IOException { TimelineEvent event = new TimelineEvent(); String eventId = ApplicationMetricsConstants.CREATED_EVENT_TYPE; event.setId(eventId); - Long expTs = 1436512802000L; + long expTs = 1436512802000L; event.setTimestamp(expTs); String expKey = "foo_event"; Object expVal = "test"; @@ -577,24 +595,25 @@ public void testEvents() throws IOException { assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid, appName)); - Map eventsResult = - ApplicationColumnPrefix.EVENT.readResults(result); + Map eventsResult = + ApplicationColumnPrefix.EVENT. + readResultsHavingCompoundColumnQualifiers(result); // there should be only one event assertEquals(1, eventsResult.size()); - // key name for the event - byte[] compoundColumnQualifierBytes = - Separator.VALUES.join(Bytes.toBytes(eventId), - Bytes.toBytes(TimelineWriterUtils.invert(expTs)), - Bytes.toBytes(expKey)); - String valueKey = Bytes.toString(compoundColumnQualifierBytes); - for (Map.Entry e : eventsResult.entrySet()) { - // the value key must match - assertEquals(valueKey, e.getKey()); + for (Map.Entry e : eventsResult.entrySet()) { + // the qualifier is a compound key + // hence match individual values + byte[][] karr = e.getKey(); + assertEquals(3, karr.length); + assertEquals(eventId, Bytes.toString(karr[0])); + assertEquals(TimelineWriterUtils.invert(expTs), Bytes.toLong(karr[1])); + assertEquals(expKey, Bytes.toString(karr[2])); Object value = e.getValue(); // there should be only one timestamp and value assertEquals(expVal, value.toString()); } + // read the timeline entity using the reader this time TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName, entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL)); @@ -613,6 +632,21 @@ public void testEvents() throws IOException { assertEquals(1, es1.size()); assertEquals(1, es2.size()); assertEquals(es1, es2); + + // check the events + NavigableSet events = e1.getEvents(); + // there should be only one event + assertEquals(1, events.size()); + for (TimelineEvent e : events) { + assertEquals(eventId, e.getId()); + assertEquals(expTs, e.getTimestamp()); + Map info = e.getInfo(); + assertEquals(1, info.size()); + for (Map.Entry infoEntry : info.entrySet()) { + assertEquals(expKey, infoEntry.getKey()); + assertEquals(expVal, infoEntry.getValue()); + } + } } finally { if (hbi != null) { hbi.stop(); @@ -630,7 +664,7 @@ public void testEventsWithEmptyInfo() throws IOException { TimelineEvent event = new TimelineEvent(); String eventId = "foo_event_id"; event.setId(eventId); - Long expTs = 1436512802000L; + long expTs = 1436512802000L; event.setTimestamp(expTs); final TimelineEntity entity = new TimelineEntity(); @@ -678,22 +712,21 @@ public void testEventsWithEmptyInfo() throws IOException { assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName, entity)); - Map eventsResult = - EntityColumnPrefix.EVENT.readResults(result); + Map eventsResult = + EntityColumnPrefix.EVENT. + readResultsHavingCompoundColumnQualifiers(result); // there should be only one event assertEquals(1, eventsResult.size()); - // key name for the event - byte[] compoundColumnQualifierWithTsBytes = - Separator.VALUES.join(Bytes.toBytes(eventId), - Bytes.toBytes(TimelineWriterUtils.invert(expTs))); - byte[] compoundColumnQualifierBytes = - Separator.VALUES.join(compoundColumnQualifierWithTsBytes, - null); - String valueKey = Bytes.toString(compoundColumnQualifierBytes); - for (Map.Entry e : - eventsResult.entrySet()) { - // the column qualifier key must match - assertEquals(valueKey, e.getKey()); + for (Map.Entry e : eventsResult.entrySet()) { + // the qualifier is a compound key + // hence match individual values + byte[][] karr = e.getKey(); + assertEquals(3, karr.length); + assertEquals(eventId, Bytes.toString(karr[0])); + assertEquals(TimelineWriterUtils.invert(expTs), + Bytes.toLong(karr[1])); + // key must be empty + assertEquals(0, karr[2].length); Object value = e.getValue(); // value should be empty assertEquals("", value.toString()); @@ -702,6 +735,7 @@ public void testEventsWithEmptyInfo() throws IOException { } assertEquals(1, rowCount); + // read the timeline entity using the reader this time TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName, entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL)); @@ -710,6 +744,17 @@ public void testEventsWithEmptyInfo() throws IOException { null, null, null, null, EnumSet.of(TimelineReader.Field.ALL)); assertNotNull(e1); assertEquals(1, es1.size()); + + // check the events + NavigableSet events = e1.getEvents(); + // there should be only one event + assertEquals(1, events.size()); + for (TimelineEvent e : events) { + assertEquals(eventId, e.getId()); + assertEquals(expTs, e.getTimestamp()); + Map info = e.getInfo(); + assertTrue(info == null || info.isEmpty()); + } } finally { hbi.stop(); hbi.close();