diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java index 8ab54bc..9504393 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java @@ -40,9 +40,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -50,25 +48,28 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; @@ -602,7 +603,8 @@ public void testWriteApplicationToHBase() throws Exception { assertEquals(cTime1, cTime); Map infoColumns = - ApplicationColumnPrefix.INFO.readResults(result); + ApplicationColumnPrefix.INFO.readResults(result, + StringKeyConverter.getInstance()); assertEquals(infoMap, infoColumns); // Remember isRelatedTo is of type Map> @@ -638,7 +640,7 @@ public void testWriteApplicationToHBase() throws Exception { // Configuration Map configColumns = - ApplicationColumnPrefix.CONFIG.readResults(result); + ApplicationColumnPrefix.CONFIG.readResults(result, null); assertEquals(conf, configColumns); NavigableMap> metricsResult = @@ -801,7 +803,8 @@ public void testWriteEntityToHBase() throws Exception { assertEquals(cTime1, cTime); Map infoColumns = - EntityColumnPrefix.INFO.readResults(result); + EntityColumnPrefix.INFO.readResults(result, + StringKeyConverter.getInstance()); assertEquals(infoMap, infoColumns); // Remember isRelatedTo is of type Map> @@ -839,7 +842,7 @@ public void testWriteEntityToHBase() throws Exception { // Configuration Map configColumns = - EntityColumnPrefix.CONFIG.readResults(result); + EntityColumnPrefix.CONFIG.readResults(result, null); assertEquals(conf, configColumns); NavigableMap> metricsResult = @@ -971,20 +974,18 @@ public void testEvents() throws IOException { assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid, appName)); - Map eventsResult = - ApplicationColumnPrefix.EVENT. - readResultsHavingCompoundColumnQualifiers(result); + Map eventsResult = + ApplicationColumnPrefix.EVENT.readResults(result, + EventColumnNameConverter.getInstance()); // there should be only one event assertEquals(1, eventsResult.size()); - for (Map.Entry e : eventsResult.entrySet()) { + for (Map.Entry e : eventsResult.entrySet()) { + EventColumnName eventColumnName = e.getKey(); // the qualifier is a compound key // hence match individual values - byte[][] karr = (byte[][])e.getKey(); - assertEquals(3, karr.length); - assertEquals(eventId, Bytes.toString(karr[0])); - assertEquals( - TimelineStorageUtils.invertLong(expTs), Bytes.toLong(karr[1])); - assertEquals(expKey, Bytes.toString(karr[2])); + assertEquals(eventId, eventColumnName.getId()); + assertEquals(expTs, eventColumnName.getTimestamp()); + assertEquals(expKey, eventColumnName.getInfoKey()); Object value = e.getValue(); // there should be only one timestamp and value assertEquals(expVal, value.toString()); @@ -1075,21 +1076,19 @@ public void testEventsWithEmptyInfo() throws IOException { assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName, entity)); - Map eventsResult = - EntityColumnPrefix.EVENT. - readResultsHavingCompoundColumnQualifiers(result); + Map eventsResult = + EntityColumnPrefix.EVENT.readResults(result, + EventColumnNameConverter.getInstance()); // there should be only one event assertEquals(1, eventsResult.size()); - for (Map.Entry e : eventsResult.entrySet()) { + for (Map.Entry e : eventsResult.entrySet()) { + EventColumnName eventColumnName = e.getKey(); // the qualifier is a compound key // hence match individual values - byte[][] karr = (byte[][])e.getKey(); - assertEquals(3, karr.length); - assertEquals(eventId, Bytes.toString(karr[0])); - assertEquals(TimelineStorageUtils.invertLong(expTs), - Bytes.toLong(karr[1])); + assertEquals(eventId, eventColumnName.getId()); + assertEquals(expTs,eventColumnName.getTimestamp()); // key must be empty - assertEquals(0, karr[2].length); + assertEquals(0, eventColumnName.getInfoKey()); Object value = e.getValue(); // value should be empty assertEquals("", value.toString()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index 172f982..2e6352f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -417,14 +417,14 @@ private void storeEvents(byte[] rowKey, Set events, getCompoundColQualBytes(eventId, eventTs, null); ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable, compoundColumnQualifierBytes, null, - TimelineStorageUtils.EMPTY_BYTES); + Separator.EMPTY_BYTES); } else { byte[] compoundColumnQualifierBytes = EntityColumnPrefix.EVENT. getCompoundColQualBytes(eventId, eventTs, null); EntityColumnPrefix.EVENT.store(rowKey, entityTable, compoundColumnQualifierBytes, null, - TimelineStorageUtils.EMPTY_BYTES); + Separator.EMPTY_BYTES); } } else { for (Map.Entry info : eventInfo.entrySet()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java index 1dfc4db..cc91e0c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java @@ -27,9 +27,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; @@ -232,25 +233,12 @@ public ValueConverter getValueConverter() { * * @see * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #readResults(org.apache.hadoop.hbase.client.Result) + * #readResults(org.apache.hadoop.hbase.client.Result, + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter) */ - public Map readResults(Result result) throws IOException { - return column.readResults(result, columnPrefixBytes); - } - - /** - * @param result from which to read columns - * @return the latest values of columns in the column family. The column - * qualifier is returned as a list of parts, each part a byte[]. This - * is to facilitate returning byte arrays of values that were not - * Strings. If they can be treated as Strings, you should use - * {@link #readResults(Result)} instead. - * @throws IOException if any problem occurs while reading results. - */ - public Map readResultsHavingCompoundColumnQualifiers(Result result) - throws IOException { - return column.readResultsHavingCompoundColumnQualifiers(result, - columnPrefixBytes); + public Map readResults(Result result, + KeyConverter keyConverter) throws IOException { + return column.readResults(result, columnPrefixBytes, keyConverter); } /* diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java index dff677b..4a9053a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java @@ -290,6 +290,72 @@ public Object readResult(Result result, byte[] columnQualifierBytes) } /** + * @param + * @param result from which to read columns + * @param columnPrefixBytes optional prefix to limit columns. If null all + * columns are returned. + * @return the latest values of columns in the column family. If the column + * prefix is null, the column qualifier is returned as Strings. For a + * non-null column prefix bytes, the column qualifier is returned as + * a list of parts, each part a byte[]. This is to facilitate + * returning byte arrays of values that were not Strings. + * @throws IOException if any problem occurs while reading results. + */ + public Map readResults(Result result, + byte[] columnPrefixBytes, KeyConverter keyConverter) + throws IOException { + Map results = new HashMap(); + + if (result != null) { + Map columns = result.getFamilyMap(columnFamilyBytes); + for (Entry entry : columns.entrySet()) { + byte[] columnKey = entry.getKey(); + if (columnKey != null && columnKey.length > 0) { + + K converteColumnKey = null; + if (columnPrefixBytes == null) { + LOG.info("null prefix was specified; returning all columns"); + + try { + converteColumnKey = keyConverter.decode(columnKey); + } catch (IllegalArgumentException iae) { + LOG.error("Illegal column found, skipping this column.", iae); + continue; + } + } else { + // A non-null prefix means columns are actually of the form + // prefix!columnNameRemainder + byte[][] columnNameParts = Separator.QUALIFIERS.split(columnKey, 2); + if (columnNameParts.length > 0) { + byte[] actualColumnPrefixBytes = columnNameParts[0]; + // If this is the prefix that we want + if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes) + && columnNameParts.length == 2) { + try { + converteColumnKey = keyConverter.decode(columnNameParts[1]); + } catch (IllegalArgumentException iae) { + LOG.error("Illegal column found, skipping this column.", iae); + continue; + } + } + } + } // if-else + + // If the columnPrefix is null (we want all columns), or the actual + // prefix matches the given prefix we want this column + if (converteColumnKey != null) { + Object value = converter.decodeValue(entry.getValue()); + // we return the columnQualifier in parts since we don't know + // which part is of which data type + results.put(converteColumnKey, value); + } + } + } // for entry + } + return results; + } + + /** * @param result from which to read columns * @param columnPrefixBytes optional prefix to limit columns. If null all * columns are returned. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java index e4b7f16..caf8430 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java @@ -92,12 +92,15 @@ void store(byte[] rowKey, TypedBufferedMutator tableMutator, /** * @param result from which to read columns + * @param keyConverter used to convert column bytes to the appropriate key + * type * @return the latest values of columns in the column family with this prefix * (or all of them if the prefix value is null). * @throws IOException if there is any exception encountered while reading - * results. + * results. */ - Map readResults(Result result) throws IOException; + Map readResults(Result result, KeyConverter keyConverter) + throws IOException; /** * @param result from which to reads data with timestamps @@ -112,18 +115,6 @@ void store(byte[] rowKey, TypedBufferedMutator tableMutator, readResultsWithTimestamps(Result result) throws IOException; /** - * @param result from which to read columns - * @return the latest values of columns in the column family. The column - * qualifier is returned as a list of parts, each part a byte[]. This - * is to facilitate returning byte arrays of values that were not - * Strings. If they can be treated as Strings, you should use - * {@link #readResults(Result)} instead. - * @throws IOException if any problem occurs while reading results. - */ - Map readResultsHavingCompoundColumnQualifiers(Result result) - throws IOException; - - /** * @param qualifierPrefix Column qualifier or prefix of qualifier. * @return a byte array encoding column prefix and qualifier/prefix passed. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java new file mode 100644 index 0000000..4f3d6fc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +public class EventColumnName { + + private final String id; + private final long timestamp; + private final String infoKey; + + public EventColumnName(String id, long timestamp, String infoKey) { + super(); + this.id = id; + this.timestamp = timestamp; + this.infoKey = infoKey; + } + + public String getId() { + return id; + } + + public long getTimestamp() { + return timestamp; + } + + public String getInfoKey() { + return infoKey; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java new file mode 100644 index 0000000..f8d25af --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import org.apache.hadoop.hbase.util.Bytes; + +public class EventColumnNameConverter implements KeyConverter { + + private static final EventColumnNameConverter INSTANCE = new EventColumnNameConverter(); + + public static final EventColumnNameConverter getInstance() { + return INSTANCE; + } + + private EventColumnNameConverter() { + } + + // EventId=timestamp=infokey are of types String, Long String + // Strings are unbound in size + private static final int[] SIZES = {Separator.NO_LIMIT_SPLIT, (Long.SIZE / Byte.SIZE), Separator.NO_LIMIT_SPLIT}; + + @Override + public EventColumnName decode(byte[] bytes) { + byte[][] components = Separator.VALUES.split(bytes, SIZES); + if (components.length != 3) { + throw new IllegalArgumentException("the column name is not valid"); + } + + String id = Bytes.toString(components[0]); + long ts = TimelineStorageUtils.invertLong(Bytes.toLong(components[1])); + String infoKey = components[2].length == 0 ? null : Bytes.toString(components[2]); + return new EventColumnName(id, ts, infoKey); + } + + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/KeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/KeyConverter.java new file mode 100644 index 0000000..5c15810 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/KeyConverter.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +/** + * + */ +public interface KeyConverter { + + /** + * @param bytes byte representation + * @return parsed from the input bytes. + */ + K decode(byte[] bytes); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java index a81c717..cdbb0c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java @@ -67,7 +67,11 @@ */ private final String quotedValue; - private static final byte[] EMPTY_BYTES = new byte[0]; + /** indicator for no limits for splitting. */ + public static final int NO_LIMIT_SPLIT = 0; + + /** empty bytes. */ + public static final byte[] EMPTY_BYTES = new byte[0]; /** * @param value of the separator to use. Cannot be null or empty string. @@ -307,7 +311,19 @@ public String joinEncoded(Iterable items) { * @return source split by this separator. */ public byte[][] split(byte[] source, int limit) { - return TimelineStorageUtils.split(source, this.bytes, limit); + return split(source, this.bytes, limit); + } + + /** + * Splits the source array into multiple array segments using this separator. + * The sizes indicate the sizes of the relative components (in case one of the components actually contains this separator). + * Strings that cannot contain this separator are indiced with a size of {@value #NO_LIMIT_SPLIT}. + * @param source + * @param sizes + * @return + */ + public byte[][] split(byte[] source, int[] sizes) { + return split(source, this.bytes, sizes); } /** @@ -319,6 +335,178 @@ public String joinEncoded(Iterable items) { * @return source split by this separator. */ public byte[][] split(byte[] source) { - return TimelineStorageUtils.split(source, this.bytes); + return split(source, this.bytes); + } + + /** + * TODO add proper javadoc + */ + public static List splitRanges(byte[] source, byte[] separator, + int[] sizes) { + List segments = new ArrayList(); + if (source == null || separator == null || sizes == null) { + return segments; + } + int start = 0; + int i = 0; + int k = 0; + itersource: while (i < source.length && segments.size() < sizes.length) { + int currentTokenSize = sizes[k]; + if (currentTokenSize > 0) { + // we explicitly grab a fixed number of bytes + if (start + currentTokenSize > source.length) { + // it's seeking beyond the source boundary + throw new IllegalArgumentException("source is " + source.length + + " bytes long and we're asking for " + (start + currentTokenSize)); + } + segments.add(new Range(start, start + currentTokenSize)); + start += currentTokenSize; + i += currentTokenSize; + k++; + // if there is more to parse, there must be a separator; strip it + if (k <= sizes.length - 1) { + for (int j = 0; j < separator.length; j++) { + if (source[i + j] != separator[j]) { + throw new IllegalArgumentException("separator is expected"); + } + continue; + } + // matched the separator + start = i + separator.length; + i += separator.length; + } + } else if (currentTokenSize <= 0) { // use the separator + // continue until we match the separator + for (int j = 0; j < separator.length; j++) { + if (source[i + j] != separator[j]) { + i++; + continue itersource; + } + // we just matched all separator elements + segments.add(new Range(start, i)); + start = i + separator.length; + i += separator.length; + k++; + } + } else { + throw new IllegalArgumentException("negative size provided"); + } + } + // add the final segment + if (start < source.length && segments.size() < sizes.length) { + // by deduction this can happen only if the token size = NO_LIMIT + segments.add(new Range(start, source.length)); + } + return segments; } + + /** + * Returns a list of ranges identifying [start, end) -- closed, open -- + * positions within the source byte array that would be split using the + * separator byte array. + * + * @param source the source data + * @param separator the separator pattern to look for + * @param limit the maximum number of splits to identify in the source + * @return a list of ranges. + */ + public static List splitRanges(byte[] source, byte[] separator, + int limit) { + List segments = new ArrayList(); + if ((source == null) || (separator == null)) { + return segments; + } + int start = 0; + itersource: for (int i = 0; i < source.length; i++) { + for (int j = 0; j < separator.length; j++) { + if (source[i + j] != separator[j]) { + continue itersource; + } + } + // all separator elements matched + if (limit > 0 && segments.size() >= (limit - 1)) { + // everything else goes in one final segment + break; + } + segments.add(new Range(start, i)); + start = i + separator.length; + // i will be incremented again in outer for loop + i += separator.length - 1; + } + // add in remaining to a final range + if (start <= source.length) { + segments.add(new Range(start, source.length)); + } + return segments; + } + + /** + * Returns a list of ranges identifying [start, end) -- closed, open -- + * positions within the source byte array that would be split using the + * separator byte array. + * + * @param source Source array. + * @param separator Separator represented as a byte array. + * @return a list of ranges. + */ + public static List splitRanges(byte[] source, byte[] separator) { + return splitRanges(source, separator, NO_LIMIT_SPLIT); + } + + /** + * @param source + * @param segments + * @return + */ + static byte[][] split(byte[] source, List segments) { + byte[][] splits = new byte[segments.size()][]; + for (int i = 0; i < segments.size(); i++) { + Range r = segments.get(i); + byte[] tmp = new byte[r.length()]; + if (tmp.length > 0) { + System.arraycopy(source, r.start(), tmp, 0, r.length()); + } + splits[i] = tmp; + } + return splits; + } + + // + public static byte[][] split(byte[] source, byte[] separator, int[] sizes) { + List segments = splitRanges(source, separator, sizes); + return split(source, segments); + } + + /** + * Splits the source array into multiple array segments using the given + * separator, up to a maximum of count items. This will naturally produce + * copied byte arrays for each of the split segments. To identify the split + * ranges without the array copies, see {@link Separator#splitRanges(byte[], byte[])}. + * + * @param source Source array. + * @param separator Separator represented as a byte array. + * @return byte[][] after splitting the source + */ + public static byte[][] split(byte[] source, byte[] separator) { + return split(source, separator, NO_LIMIT_SPLIT); + } + + /** + * Splits the source array into multiple array segments using the given + * separator, up to a maximum of count items. This will naturally produce + * copied byte arrays for each of the split segments. To identify the split + * ranges without the array copies, see {@link Separator#splitRanges(byte[], byte[])}. + * + * @param source Source array. + * @param separator Separator represented as a byte array. + * @param limit a non-positive value indicates no limit on number of segments. + * @return byte[][] after splitting the input source. + */ + public static byte[][] split(byte[] source, byte[] separator, int limit) { + List segments = splitRanges(source, separator, limit); + return split(source, segments); + } + + + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java new file mode 100644 index 0000000..c927842 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +public class StringKeyConverter implements KeyConverter { + + private static final StringKeyConverter INSTANCE = new StringKeyConverter(); + + public static StringKeyConverter getInstance() { + return INSTANCE; + } + + /** + * Private constructor + */ + private StringKeyConverter() { + } + + @Override + public String decode(byte[] bytes) { + return Separator.decode(bytes, Separator.SPACE); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java index 18f975a..d11a6d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.common; import java.io.IOException; -import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -48,11 +47,11 @@ import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilter.TimelineFilterType; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; @@ -72,109 +71,10 @@ private TimelineStorageUtils() { private static final Log LOG = LogFactory.getLog(TimelineStorageUtils.class); - /** empty bytes. */ - public static final byte[] EMPTY_BYTES = new byte[0]; - - /** indicator for no limits for splitting. */ - public static final int NO_LIMIT_SPLIT = -1; - /** milliseconds in one day. */ public static final long MILLIS_ONE_DAY = 86400000L; /** - * Splits the source array into multiple array segments using the given - * separator, up to a maximum of count items. This will naturally produce - * copied byte arrays for each of the split segments. To identify the split - * ranges without the array copies, see {@link #splitRanges(byte[], byte[])}. - * - * @param source Source array. - * @param separator Separator represented as a byte array. - * @return byte[][] after splitting the source - */ - public static byte[][] split(byte[] source, byte[] separator) { - return split(source, separator, NO_LIMIT_SPLIT); - } - - /** - * Splits the source array into multiple array segments using the given - * separator, up to a maximum of count items. This will naturally produce - * copied byte arrays for each of the split segments. To identify the split - * ranges without the array copies, see {@link #splitRanges(byte[], byte[])}. - * - * @param source Source array. - * @param separator Separator represented as a byte array. - * @param limit a non-positive value indicates no limit on number of segments. - * @return byte[][] after splitting the input source. - */ - public static byte[][] split(byte[] source, byte[] separator, int limit) { - List segments = splitRanges(source, separator, limit); - - byte[][] splits = new byte[segments.size()][]; - for (int i = 0; i < segments.size(); i++) { - Range r = segments.get(i); - byte[] tmp = new byte[r.length()]; - if (tmp.length > 0) { - System.arraycopy(source, r.start(), tmp, 0, r.length()); - } - splits[i] = tmp; - } - return splits; - } - - /** - * Returns a list of ranges identifying [start, end) -- closed, open -- - * positions within the source byte array that would be split using the - * separator byte array. - * - * @param source Source array. - * @param separator Separator represented as a byte array. - * @return a list of ranges. - */ - public static List splitRanges(byte[] source, byte[] separator) { - return splitRanges(source, separator, NO_LIMIT_SPLIT); - } - - /** - * Returns a list of ranges identifying [start, end) -- closed, open -- - * positions within the source byte array that would be split using the - * separator byte array. - * - * @param source the source data - * @param separator the separator pattern to look for - * @param limit the maximum number of splits to identify in the source - * @return a list of ranges. - */ - public static List splitRanges(byte[] source, byte[] separator, - int limit) { - List segments = new ArrayList(); - if ((source == null) || (separator == null)) { - return segments; - } - int start = 0; - itersource: for (int i = 0; i < source.length; i++) { - for (int j = 0; j < separator.length; j++) { - if (source[i + j] != separator[j]) { - continue itersource; - } - } - // all separator elements matched - if (limit > 0 && segments.size() >= (limit - 1)) { - // everything else goes in one final segment - break; - } - segments.add(new Range(start, i)); - start = i + separator.length; - // i will be incremented again in outer for loop - i += separator.length - 1; - } - // add in remaining to a final range - if (start <= source.length) { - segments.add(new Range(start, source.length)); - } - return segments; - } - - /** * Converts a timestamp into it's inverse timestamp to be used in (row) keys * where we want to have the most recent timestamp in the top of the table * (scans start at the most recent timestamp first). @@ -810,7 +710,8 @@ public static String getAggregationCompactionDimension(List tags) { TimelineEntity entity, Result result, ColumnPrefix prefix, boolean isRelatedTo) throws IOException { // isRelatedTo and relatesTo are of type Map> - Map columns = prefix.readResults(result); + Map columns = + prefix.readResults(result, StringKeyConverter.getInstance()); for (Map.Entry column : columns.entrySet()) { for (String id : Separator.VALUES.splitEncoded( column.getValue().toString())) { @@ -837,7 +738,8 @@ public static String getAggregationCompactionDimension(List tags) { TimelineEntity entity, Result result, ColumnPrefix prefix, boolean isConfig) throws IOException { // info and configuration are of type Map - Map columns = prefix.readResults(result); + Map columns = + prefix.readResults(result, StringKeyConverter.getInstance()); if (isConfig) { for (Map.Entry column : columns.entrySet()) { entity.addConfig(column.getKey(), column.getValue().toString()); @@ -861,30 +763,22 @@ public static String getAggregationCompactionDimension(List tags) { public static void readEvents(TimelineEntity entity, Result result, ColumnPrefix prefix) throws IOException { Map eventsMap = new HashMap<>(); - Map eventsResult = - prefix.readResultsHavingCompoundColumnQualifiers(result); - for (Map.Entry eventResult : eventsResult.entrySet()) { - byte[][] karr = (byte[][])eventResult.getKey(); - // the column name is of the form "eventId=timestamp=infoKey" - if (karr.length == 3) { - String id = Bytes.toString(karr[0]); - long ts = TimelineStorageUtils.invertLong(Bytes.toLong(karr[1])); - String key = Separator.VALUES.joinEncoded(id, Long.toString(ts)); - TimelineEvent event = eventsMap.get(key); - if (event == null) { - event = new TimelineEvent(); - event.setId(id); - event.setTimestamp(ts); - eventsMap.put(key, event); - } - // handle empty info - String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]); - if (infoKey != null) { - event.addInfo(infoKey, eventResult.getValue()); - } - } else { - LOG.warn("incorrectly formatted column name: it will be discarded"); - continue; + Map eventsResult = + prefix.readResults(result, EventColumnNameConverter.getInstance()); + for (Map.Entry eventResult : eventsResult.entrySet()) { + EventColumnName eventColumnName = eventResult.getKey(); + String key = eventColumnName.getId() + Long.toString(eventColumnName.getTimestamp()); + // Retrieve previously seen event to add to it + TimelineEvent event = eventsMap.get(key); + if (event == null) { + // First time we're seeing this event, add it to the eventsMap + event = new TimelineEvent(); + event.setId(eventColumnName.getId()); + event.setTimestamp(eventColumnName.getTimestamp()); + eventsMap.put(key, event); + } + if (eventColumnName.getInfoKey() != null) { + event.addInfo(eventColumnName.getInfoKey(), eventResult.getValue()); } } Set eventsSet = new HashSet<>(eventsMap.values()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java index de2b29d..358c525 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java @@ -27,9 +27,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; @@ -233,26 +234,12 @@ public Object readResult(Result result, String qualifier) throws IOException { * * @see * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #readResults(org.apache.hadoop.hbase.client.Result) - */ - public Map readResults(Result result) throws IOException { - return column.readResults(result, columnPrefixBytes); - } - - /** - * @param result from which to read columns - * @return the latest values of columns in the column family. The column - * qualifier is returned as a list of parts, each part a byte[]. This - * is to facilitate returning byte arrays of values that were not - * Strings. If they can be treated as Strings, you should use - * {@link #readResults(Result)} instead. - * @throws IOException if there is any exception encountered while reading - * result. + * #readResults(org.apache.hadoop.hbase.client.Result, + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter) */ - public Map readResultsHavingCompoundColumnQualifiers(Result result) - throws IOException { - return column.readResultsHavingCompoundColumnQualifiers(result, - columnPrefixBytes); + public Map readResults(Result result, + KeyConverter keyConverter) throws IOException { + return column.readResults(result, columnPrefixBytes, keyConverter); } /* diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java index 188c2fe..345ce1f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; @@ -169,10 +170,12 @@ public Object readResult(Result result, String qualifier) throws IOException { * * @see * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #readResults(org.apache.hadoop.hbase.client.Result) + * #readResults(org.apache.hadoop.hbase.client.Result, + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter) */ - public Map readResults(Result result) throws IOException { - return column.readResults(result, columnPrefixBytes); + public Map readResults(Result result, + KeyConverter keyConverter) throws IOException { + return column.readResults(result, columnPrefixBytes, keyConverter); } /* @@ -182,7 +185,7 @@ public Object readResult(Result result, String qualifier) throws IOException { * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result) */ - public NavigableMap> + public NavigableMap> readResultsWithTimestamps(Result result) throws IOException { return column.readResultsWithTimestamps(result, columnPrefixBytes); } @@ -280,10 +283,4 @@ public void store(byte[] rowKey, return ColumnHelper.getCompoundColumnQualifierBytes(qualifier, components); } - @Override - public Map readResultsHavingCompoundColumnQualifiers(Result result) - throws IOException { - // There are no compound column qualifiers for flow activity table. - return null; - } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java index 77f2ab2..372cc29 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java @@ -26,10 +26,11 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; /** @@ -40,8 +41,7 @@ /** * To store flow run info values. */ - METRIC(FlowRunColumnFamily.INFO, "m", null, - LongConverter.getInstance()); + METRIC(FlowRunColumnFamily.INFO, "m", null, LongConverter.getInstance()); private final ColumnHelper column; private final ColumnFamily columnFamily; @@ -59,10 +59,8 @@ /** * Private constructor, meant to be used by the enum definition. * - * @param columnFamily - * that this column is stored in. - * @param columnPrefix - * for this column. + * @param columnFamily that this column is stored in. + * @param columnPrefix for this column. */ private FlowRunColumnPrefix(ColumnFamily columnFamily, String columnPrefix, AggregationOperation fra, ValueConverter converter) { @@ -79,8 +77,8 @@ private FlowRunColumnPrefix(ColumnFamily columnFamily, this.columnPrefixBytes = null; } else { // Future-proof by ensuring the right column prefix hygiene. - this.columnPrefixBytes = Bytes.toBytes(Separator.SPACE - .encode(columnPrefix)); + this.columnPrefixBytes = + Bytes.toBytes(Separator.SPACE.encode(columnPrefix)); } this.aggOp = fra; this.compoundColQual = compoundColQual; @@ -99,14 +97,14 @@ public String getColumnPrefix() { @Override public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) { - return ColumnHelper.getColumnQualifier( - this.columnPrefixBytes, qualifierPrefix); + return ColumnHelper.getColumnQualifier(this.columnPrefixBytes, + qualifierPrefix); } @Override public byte[] getColumnPrefixBytes(String qualifierPrefix) { - return ColumnHelper.getColumnQualifier( - this.columnPrefixBytes, qualifierPrefix); + return ColumnHelper.getColumnQualifier(this.columnPrefixBytes, + qualifierPrefix); } @Override @@ -139,8 +137,8 @@ public void store(byte[] rowKey, } byte[] columnQualifier = getColumnPrefixBytes(qualifier); - Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes( - attributes, this.aggOp); + Attribute[] combinedAttributes = + TimelineStorageUtils.combineAttributes(attributes, this.aggOp); column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, combinedAttributes); } @@ -166,8 +164,8 @@ public void store(byte[] rowKey, } byte[] columnQualifier = getColumnPrefixBytes(qualifier); - Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes( - attributes, this.aggOp); + Attribute[] combinedAttributes = + TimelineStorageUtils.combineAttributes(attributes, this.aggOp); column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, combinedAttributes); } @@ -180,8 +178,8 @@ public void store(byte[] rowKey, * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String) */ public Object readResult(Result result, String qualifier) throws IOException { - byte[] columnQualifier = ColumnHelper.getColumnQualifier( - this.columnPrefixBytes, qualifier); + byte[] columnQualifier = + ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); return column.readResult(result, columnQualifier); } @@ -190,10 +188,12 @@ public Object readResult(Result result, String qualifier) throws IOException { * * @see * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #readResults(org.apache.hadoop.hbase.client.Result) + * #readResults(org.apache.hadoop.hbase.client.Result, + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter) */ - public Map readResults(Result result) throws IOException { - return column.readResults(result, columnPrefixBytes); + public Map readResults(Result result, + KeyConverter keyConverter) throws IOException { + return column.readResults(result, columnPrefixBytes, keyConverter); } /* @@ -203,8 +203,8 @@ public Object readResult(Result result, String qualifier) throws IOException { * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result) */ - public NavigableMap> - readResultsWithTimestamps(Result result) throws IOException { + public NavigableMap> readResultsWithTimestamps( + Result result) throws IOException { return column.readResultsWithTimestamps(result, columnPrefixBytes); } @@ -213,8 +213,7 @@ public Object readResult(Result result, String qualifier) throws IOException { * no match. The following holds true: {@code columnFor(x) == columnFor(y)} if * and only if {@code x.equals(y)} or {@code (x == y == null)} * - * @param columnPrefix - * Name of the column to retrieve + * @param columnPrefix Name of the column to retrieve * @return the corresponding {@link FlowRunColumnPrefix} or null */ public static final FlowRunColumnPrefix columnFor(String columnPrefix) { @@ -242,10 +241,8 @@ public ValueConverter getValueConverter() { * {@code columnFor(a,x) == columnFor(b,y)} if and only if * {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)} * - * @param columnFamily - * The columnFamily for which to retrieve the column. - * @param columnPrefix - * Name of the column to retrieve + * @param columnFamily The columnFamily for which to retrieve the column. + * @param columnPrefix Name of the column to retrieve * @return the corresponding {@link FlowRunColumnPrefix} or null if both * arguments don't match. */ @@ -258,8 +255,8 @@ public static final FlowRunColumnPrefix columnFor( for (FlowRunColumnPrefix frcp : FlowRunColumnPrefix.values()) { // Find a match based column family and on name. if (frcp.columnFamily.equals(columnFamily) - && (((columnPrefix == null) && (frcp.getColumnPrefix() == null)) || - (frcp.getColumnPrefix().equals(columnPrefix)))) { + && (((columnPrefix == null) && (frcp.getColumnPrefix() == null)) || (frcp + .getColumnPrefix().equals(columnPrefix)))) { return frcp; } } @@ -269,18 +266,11 @@ public static final FlowRunColumnPrefix columnFor( } @Override - public byte[] getCompoundColQualBytes(String qualifier, - byte[]...components) { + public byte[] getCompoundColQualBytes(String qualifier, byte[]... components) { if (!compoundColQual) { return ColumnHelper.getColumnQualifier(null, qualifier); } return ColumnHelper.getCompoundColumnQualifierBytes(qualifier, components); } - @Override - public Map readResultsHavingCompoundColumnQualifiers(Result result) - throws IOException { - // There are no compound column qualifiers for flow run table. - return null; - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java index 398d7b4..648c77b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java @@ -44,9 +44,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; import com.google.common.annotations.VisibleForTesting; @@ -193,7 +194,7 @@ private boolean nextInternal(List cells, int cellLimit) // So all cells in one qualifier come one after the other before we see the // next column qualifier ByteArrayComparator comp = new ByteArrayComparator(); - byte[] currentColumnQualifier = TimelineStorageUtils.EMPTY_BYTES; + byte[] currentColumnQualifier = Separator.EMPTY_BYTES; AggregationOperation currentAggOp = null; SortedSet currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR); Set alreadySeenAggDim = new HashSet<>(); @@ -314,7 +315,7 @@ private void collectCells(SortedSet currentColumnCells, + " cell qualifier=" + Bytes.toString(CellUtil.cloneQualifier(cell)) + " cell value= " - + (Number) converter.decodeValue(CellUtil.cloneValue(cell)) + + converter.decodeValue(CellUtil.cloneValue(cell)) + " timestamp=" + cell.getTimestamp()); } @@ -480,7 +481,7 @@ private Cell processSummation(SortedSet currentColumnCells, LOG.trace("MAJOR COMPACTION loop sum= " + sum + " discarding now: " + " qualifier=" + Bytes.toString(CellUtil.cloneQualifier(cell)) + " value=" - + (Number) converter.decodeValue(CellUtil.cloneValue(cell)) + + converter.decodeValue(CellUtil.cloneValue(cell)) + " timestamp=" + cell.getTimestamp() + " " + this.action); } } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java index d8ca038..211a7b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; @@ -136,7 +137,8 @@ protected TimelineEntity parseEntity(Result result) throws IOException { // get the list of run ids along with the version that are associated with // this flow on this day Map runIdsMap = - FlowActivityColumnPrefix.RUN_ID.readResults(result); + FlowActivityColumnPrefix.RUN_ID.readResults(result, + StringKeyConverter.getInstance()); for (Map.Entry e : runIdsMap.entrySet()) { Long runId = Long.valueOf(e.getKey()); String version = (String)e.getValue();