diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index 3173e87..bb67b8a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -212,21 +212,16 @@ private void storeEvents(byte[] rowKey, Set events) byte[] compoundColumnQualifierBytes = Separator.VALUES.join(columnQualifierWithTsBytes, null); - String compoundColumnQualifier = - Bytes.toString(compoundColumnQualifierBytes); EntityColumnPrefix.EVENT.store(rowKey, entityTable, - compoundColumnQualifier, null, TimelineWriterUtils.EMPTY_BYTES); + compoundColumnQualifierBytes, null, TimelineWriterUtils.EMPTY_BYTES); } else { for (Map.Entry info : eventInfo.entrySet()) { // eventId?infoKey byte[] compoundColumnQualifierBytes = Separator.VALUES.join(columnQualifierWithTsBytes, Bytes.toBytes(info.getKey())); - // convert back to string to avoid additional API on store. - String compoundColumnQualifier = - Bytes.toString(compoundColumnQualifierBytes); EntityColumnPrefix.EVENT.store(rowKey, entityTable, - compoundColumnQualifier, null, info.getValue()); + compoundColumnQualifierBytes, null, info.getValue()); } // for info: eventInfo } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java index a902924..f5658df 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java @@ -207,6 +207,10 @@ public Object readResult(Result result, byte[] columnQualifierBytes) if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes) && columnNameParts.length == 2) { // This is the prefix that we want + // if the column name is a compound qualifier + // with non string datatypes, the following decode will not + // work correctly since it considers all components to be String + // invoke the readResultsHavingCompoundColumnQualifiers function columnName = Separator.decode(columnNameParts[1]); } } @@ -223,6 +227,51 @@ public Object readResult(Result result, byte[] columnQualifierBytes) } /** + * @param result + * from which to read columns + * @param columnPrefixBytes + * optional prefix to limit columns. If null all columns are + * returned. + * @return the latest values of columns in the column family. The column + * qualifier is returned as a list of parts, each part a byte[]. This + * is to facilitate returning byte arrays of values that were not + * Strings + * @throws IOException + */ + public Map readResultsHavingCompoundColumnQualifiers( + Result result, byte[] columnPrefixBytes) throws IOException { + Map results = new HashMap(); + + if (result != null) { + Map columns = result.getFamilyMap(columnFamilyBytes); + for (Entry entry : columns.entrySet()) { + if (entry.getKey() != null && entry.getKey().length > 0) { + // A non-null prefix means columns are actually of the form + // prefix!columnNameRemainder + // with a compound column qualifier, we are presuming existence of a + // prefix + byte[][] columnNameParts = Separator.QUALIFIERS.split(entry.getKey(), + 2); + if (columnNameParts.length > 0) { + byte[] actualColumnPrefixBytes = columnNameParts[0]; + if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes) + && columnNameParts.length == 2) { + // This is the prefix that we want + byte[][] columnQualifierParts = Separator.VALUES.split( + columnNameParts[1], -1); + Object value = GenericObjectMapper.read(entry.getValue()); + // we return the columnQualifier in parts since we don't know which + // part is of which data type + results.put(columnQualifierParts, value); + } + } + } + } // for entry + } + return results; + } + + /** * @param columnPrefixBytes The byte representation for the column prefix. * Should not contain {@link Separator#QUALIFIERS}. * @param qualifier for the remainder of the column. Any @@ -247,4 +296,24 @@ public Object readResult(Result result, byte[] columnQualifierBytes) return columnQualifier; } + /** + * @param columnPrefixBytes The byte representation for the column prefix. + * Should not contain {@link Separator#QUALIFIERS}. + * @param qualifier the byte representation for the remainder of the column. + * @return fully sanitized column qualifier that is a combination of prefix + * and qualifier. If prefix is null, the result is simply the encoded + * qualifier without any separator. + */ + public static byte[] getColumnQualifier(byte[] columnPrefixBytes, + byte[] qualifier) { + + if (columnPrefixBytes == null) { + return qualifier; + } + + byte[] columnQualifier = + Separator.QUALIFIERS.join(columnPrefixBytes, qualifier); + return columnQualifier; + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java index 3319419..a3bfdbb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java @@ -37,7 +37,7 @@ /** * separator in values, and/or compound key/column qualifier fields. */ - VALUES("?", "%1$"), + VALUES("=", "%1$"), /** * separator in values, often used to avoid having these in qualifiers and diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java index 8b7bc3e..835aed5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java @@ -131,6 +131,31 @@ public void store(byte[] rowKey, * * @see * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #store(byte[], + * org.apache.hadoop.yarn.server.timelineservice.storage.common. + * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object) + */ + public void store(byte[] rowKey, + TypedBufferedMutator tableMutator, byte[] qualifier, + Long timestamp, Object inputValue) throws IOException { + + // Null check + if (qualifier == null) { + throw new IOException("Cannot store column with null qualifier in " + + tableMutator.getName().getNameAsString()); + } + + byte[] columnQualifier = + ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); + + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String) */ public Object readResult(Result result, String qualifier) throws IOException { @@ -155,6 +180,19 @@ public Object readResult(Result result, String qualifier) throws IOException { * * @see * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResults(org.apache.hadoop.hbase.client.Result) + */ + public Map readResultsHavingCompoundColumnQualifiers( + Result result) throws IOException { + return column.readResultsHavingCompoundColumnQualifiers(result, + columnPrefixBytes); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result) */ public NavigableMap> diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java index fd5643d..d528824 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java @@ -259,10 +259,6 @@ public void testWriteEntityToHBase() throws Exception { hbi.stop(); hbi.close(); } - - // Somewhat of a hack, not a separate test in order not to have to deal with - // test case order exectution. - testAdditionalEntity(); } private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user, @@ -281,7 +277,8 @@ private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user, return true; } - private void testAdditionalEntity() throws IOException { + @Test + public void testAdditionalEntity() throws IOException { TimelineEvent event = new TimelineEvent(); String eventId = "foo_event_id"; event.setId(eventId); @@ -304,12 +301,12 @@ private void testAdditionalEntity() throws IOException { Configuration c1 = util.getConfiguration(); hbi = new HBaseTimelineWriterImpl(c1); hbi.init(c1); - String cluster = "cluster2"; - String user = "user2"; + String cluster = "event_test_cluster2"; + String user = "event_test_user"; String flow = "other_flow_name"; String flowVersion = "1111F01C2287BA"; long runid = 1009876543218L; - String appName = "some app name"; + String appName = "some app name event test"; hbi.write(cluster, user, flow, flowVersion, runid, appName, entities); hbi.stop(); // scan the table and see that entity exists @@ -330,20 +327,20 @@ private void testAdditionalEntity() throws IOException { assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName, entity)); - Map eventsResult = - EntityColumnPrefix.EVENT.readResults(result); + Map eventsResult = + EntityColumnPrefix.EVENT.readResultsHavingCompoundColumnQualifiers(result); // there should be only one event assertEquals(1, eventsResult.size()); // key name for the event - byte[] compoundColumnQualifierBytes = - Separator.VALUES.join(Bytes.toBytes(eventId), - Bytes.toBytes(TimelineWriterUtils.invert(expTs)), - Bytes.toBytes(expKey)); - String valueKey = Bytes.toString(compoundColumnQualifierBytes); - for (Map.Entry e : + for (Map.Entry e : eventsResult.entrySet()) { - // the value key must match - assertEquals(valueKey, e.getKey()); + // the qualifier is a compound key + // hence match individual values + byte[][] karr = (byte[][]) e.getKey(); + assertEquals(3, karr.length); + assertEquals(eventId, Bytes.toString(karr[0])); + assertEquals(TimelineWriterUtils.invert(expTs), Bytes.toLong(karr[1])); + assertEquals(expKey, Bytes.toString(karr[2])); Object value = e.getValue(); // there should be only one timestamp and value assertEquals(expVal, value.toString());