diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java index 27a50d5..294b05b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java @@ -77,7 +77,7 @@ private static String getClusterID(String clusterId, Configuration conf) { return reader.getEntities(userId, cluster, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, - metricFilters, eventFilters, fieldsToRetrieve); + metricFilters, eventFilters, null, null, fieldsToRetrieve); } /** @@ -91,6 +91,6 @@ public TimelineEntity getEntity(String userId, String clusterId, String entityId, EnumSet fields) throws IOException { String cluster = getClusterID(clusterId, getConfig()); return reader.getEntity(userId, cluster, flowId, flowRunId, appId, - entityType, entityId, fields); + entityType, entityId, null, null, fields); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareFilter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareFilter.java new file mode 100644 index 0000000..338afb2 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareFilter.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +@Private +@Unstable +public class TimelineCompareFilter extends TimelineFilter { + + /** Comparison Operators. */ + @Private + @Unstable + public enum CompareOp { + LESS_THAN, + LESS_OR_EQUAL, + EQUAL, + NOT_EQUAL, + GREATER_OR_EQUAL, + GREATER_THAN + } + + protected CompareOp compareOp; + private String key; + private Object value; + + public TimelineCompareFilter() { + } + + public TimelineCompareFilter(CompareOp op, String key, Object val) { + this.compareOp = op; + this.key = key; + this.value = val; + } + + @Override + public TimelineFilterType getFilterType() { + return TimelineFilterType.COMPARE; + } + + public CompareOp getCompareOp() { + return compareOp; + } + + public String getKey() { + return key; + } + + public Object getValue() { + return value; + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilter.java new file mode 100644 index 0000000..cb074d4 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilter.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +@Private +@Unstable +public abstract class TimelineFilter { + + /** + * Lists the different filter types. + */ + @Private + @Unstable + public enum TimelineFilterType { + /** + * Combines multiple filters. + */ + LIST, + /** + * Filter which is used for comparison. + */ + COMPARE, + /** + * Filter which matches prefix for a config or a metric + */ + PREFIX + } + + public abstract TimelineFilterType getFilterType(); + + public String toString() { + return this.getClass().getSimpleName(); + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterList.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterList.java new file mode 100644 index 0000000..182cd7a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterList.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +@Private +@Unstable +public class TimelineFilterList extends TimelineFilter { + @Private + @Unstable + public static enum Operator { + AND, + OR + } + + private Operator operator; + private List filterList = new ArrayList(); + + public TimelineFilterList(TimelineFilter...filters) { + this(Operator.AND, filters); + } + + public TimelineFilterList(Operator op, TimelineFilter...filters) { + this.operator = op; + this.filterList = new ArrayList(Arrays.asList(filters)); + } + + @Override + public TimelineFilterType getFilterType() { + return TimelineFilterType.LIST; + } + + /** + * Get the filter list. + * + * @return filterList + */ + public List getFilterList() { + return filterList; + } + + /** + * Get the operator. + * + * @return operator + */ + public Operator getOperator() { + return operator; + } + + public void setOperator(Operator op) { + operator = op; + } + + public void addFilter(TimelineFilter filter) { + filterList.add(filter); + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java new file mode 100644 index 0000000..8afa026 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import java.nio.charset.Charset; + +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.hbase.filter.QualifierFilter; + +public class TimelineFilterUtils { + + /** + * Returns the equivalent HBase filter list's {@link Operator} + * @param op + * @return HBase filter list's Operator + */ + private static Operator getHBaseOperator(TimelineFilterList.Operator op) { + switch (op) { + case AND: + return Operator.MUST_PASS_ALL; + case OR: + return Operator.MUST_PASS_ONE; + default: + throw new IllegalArgumentException("Invalid operator"); + } + } + + /** + * Returns the equivalent HBase compare filter's {@link CompareOp} + * @param op + * @return HBase compare filter's CompareOp + */ + private static CompareOp getHBaseCompareOp( + TimelineCompareFilter.CompareOp op) { + switch (op) { + case LESS_THAN: + return CompareOp.LESS; + case LESS_OR_EQUAL: + return CompareOp.LESS_OR_EQUAL; + case EQUAL: + return CompareOp.EQUAL; + case NOT_EQUAL: + return CompareOp.NOT_EQUAL; + case GREATER_OR_EQUAL: + return CompareOp.GREATER_OR_EQUAL; + case GREATER_THAN: + return CompareOp.GREATER; + default: + throw new IllegalArgumentException("Invalid compare operator"); + } + } + + private static byte[] joinPrefixes(byte[] prefix1, byte[] prefix2) { + if (prefix1 == null || prefix1.length == 0) { + return prefix2; + } + if (prefix2 == null || prefix2.length == 0) { + return prefix1; + } + byte[] prefix = new byte[prefix1.length + prefix2.length]; + System.arraycopy(prefix1, 0, prefix, 0, prefix1.length); + System.arraycopy(prefix2, 0, prefix, prefix1.length, prefix2.length); + return prefix; + } + + /** + * Converts a {@link TimelinePrefixFilter} to an equivalent HBase + * {@link QualifierFilter}. + * @param colPrefix + * @param filterList + * @return a {@link QualifierFilter} object + */ + private static Filter createHBaseColQualPrefixFilter(byte[] colPrefix, + TimelinePrefixFilter filter) { + return new QualifierFilter(getHBaseCompareOp(filter.getCompareOp()), + new BinaryPrefixComparator(joinPrefixes(colPrefix, + filter.getPrefix().getBytes(Charset.forName("UTF-8"))))); + } + + /** + * Creates equivalent HBase {@link FilterList} from {@link TimelineFilterList} + * while converting different timeline filters(of type {@link TimelineFilter}) + * into their equivalent HBase filters. + * @param filterList + * @return a {@link FilterList} object + */ + public static FilterList createHBaseFilterList( + TimelineFilterList filterList) { + return createHBaseFilterList(null, filterList); + } + + /** + * Creates equivalent HBase {@link FilterList} from {@link TimelineFilterList} + * while converting different timeline filters(of type {@link TimelineFilter}) + * into their equivalent HBase filters. + * @param colPrefix + * @param filterList + * @return a {@link FilterList} object + */ + public static FilterList createHBaseFilterList(byte[] colPrefix, + TimelineFilterList filterList) { + FilterList list = new FilterList(getHBaseOperator(filterList.getOperator())); + for (TimelineFilter filter : filterList.getFilterList()) { + switch(filter.getFilterType()) { + case LIST: + list.addFilter( + createHBaseFilterList(colPrefix, (TimelineFilterList)filter)); + break; + case PREFIX: + list.addFilter(createHBaseColQualPrefixFilter( + colPrefix, (TimelinePrefixFilter)filter)); + break; + default: + break; + } + } + return list; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelinePrefixFilter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelinePrefixFilter.java new file mode 100644 index 0000000..a249a89 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelinePrefixFilter.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +@Private +@Unstable +public class TimelinePrefixFilter extends TimelineCompareFilter { + + private String prefix; + + public TimelinePrefixFilter(CompareOp op, String prefix) { + this.prefix = prefix; + if (op != CompareOp.EQUAL && op != CompareOp.NOT_EQUAL) { + throw new IllegalArgumentException("CompareOp for prefix filter should " + + "be EQUAL or NOT_EQUAL"); + } + this.compareOp = op; + } + + @Override + public TimelineFilterType getFilterType() { + return TimelineFilterType.PREFIX; + } + + public String getPrefix() { + return prefix; + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java new file mode 100644 index 0000000..62c3122 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java index 8324afd..eff0fdf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java @@ -28,11 +28,21 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; @@ -56,18 +66,20 @@ public ApplicationEntityReader(String userId, String clusterId, Map> relatesTo, Map> isRelatedTo, Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, + TimelineFilterList confs, TimelineFilterList metrics, EnumSet fieldsToRetrieve) { super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, fieldsToRetrieve, true); + eventFilters, confs, metrics, fieldsToRetrieve, true); } public ApplicationEntityReader(String userId, String clusterId, String flowId, Long flowRunId, String appId, String entityType, - String entityId, EnumSet fieldsToRetrieve) { + String entityId, TimelineFilterList confs, TimelineFilterList metrics, + EnumSet fieldsToRetrieve) { super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, - fieldsToRetrieve); + confs, metrics, fieldsToRetrieve); } /** @@ -78,13 +90,88 @@ public ApplicationEntityReader(String userId, String clusterId, } @Override - protected Result getResult(Configuration hbaseConf, Connection conn) - throws IOException { + protected FilterList constructFilterListBasedOnFields() { + FilterList list = new FilterList(Operator.MUST_PASS_ONE); + // Fetch all the columns. + if (fieldsToRetrieve.contains(Field.ALL) && + (confs == null || confs.getFilterList().isEmpty()) && + (metrics == null || metrics.getFilterList().isEmpty()) ) { + return list; + } + FilterList infoColFamilyList = new FilterList(); + // By default fetch everything in INFO column family. + FamilyFilter infoColumnFamily = + new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(ApplicationColumnFamily.INFO.getBytes())); + infoColFamilyList.addFilter(infoColumnFamily); + // Events not required. + if (!fieldsToRetrieve.contains(Field.EVENTS) && + !fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + ApplicationColumnPrefix.EVENT.getColumnPrefixBytes("")))); + } + // info not required. + if (!fieldsToRetrieve.contains(Field.INFO) && + !fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + ApplicationColumnPrefix.INFO.getColumnPrefixBytes("")))); + } + // is releated to not required. + if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) && + !fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + ApplicationColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes("")))); + } + // relates to not required. + if (!fieldsToRetrieve.contains(Field.RELATES_TO) && + !fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + ApplicationColumnPrefix.RELATES_TO.getColumnPrefixBytes("")))); + } + list.addFilter(infoColFamilyList); + if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) || + (confs != null && !confs.getFilterList().isEmpty())) { + FilterList filterCfg = + new FilterList(new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(ApplicationColumnFamily.CONFIGS.getBytes()))); + if (confs != null && !confs.getFilterList().isEmpty()) { + filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList(confs)); + } + list.addFilter(filterCfg); + } + if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) || + (metrics != null && !metrics.getFilterList().isEmpty())) { + FilterList filterMetrics = + new FilterList(new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(ApplicationColumnFamily.METRICS.getBytes()))); + if (metrics != null && !metrics.getFilterList().isEmpty()) { + filterMetrics.addFilter( + TimelineFilterUtils.createHBaseFilterList(metrics)); + } + list.addFilter(filterMetrics); + } + return list; + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { byte[] rowKey = ApplicationRowKey.getRowKey(clusterId, userId, flowId, flowRunId, appId); Get get = new Get(rowKey); get.setMaxVersions(Integer.MAX_VALUE); + if (filterList != null && !filterList.getFilters().isEmpty()) { + get.setFilter(filterList); + } return table.getResult(hbaseConf, conn, get); } @@ -115,6 +202,14 @@ protected void augmentParams(Configuration hbaseConf, Connection conn) if (fieldsToRetrieve == null) { fieldsToRetrieve = EnumSet.noneOf(Field.class); } + if (!fieldsToRetrieve.contains(Field.CONFIGS) && + confs != null && !confs.getFilterList().isEmpty()) { + fieldsToRetrieve.add(Field.CONFIGS); + } + if (!fieldsToRetrieve.contains(Field.METRICS) && + metrics != null && !metrics.getFilterList().isEmpty()) { + fieldsToRetrieve.add(Field.METRICS); + } if (!singleEntityRead) { if (limit == null || limit < 0) { limit = TimelineReader.DEFAULT_LIMIT; @@ -136,7 +231,7 @@ protected void augmentParams(Configuration hbaseConf, Connection conn) @Override protected ResultScanner getResults(Configuration hbaseConf, - Connection conn) throws IOException { + Connection conn, FilterList filterList) throws IOException { Scan scan = new Scan(); if (flowRunId != null) { scan.setRowPrefixFilter(ApplicationRowKey. @@ -145,7 +240,12 @@ protected ResultScanner getResults(Configuration hbaseConf, scan.setRowPrefixFilter(ApplicationRowKey. getRowKeyPrefix(clusterId, userId, flowId)); } - scan.setFilter(new PageFilter(limit)); + FilterList newList = new FilterList(); + newList.addFilter(new PageFilter(limit)); + if (filterList != null && !filterList.getFilters().isEmpty()) { + newList.addFilter(filterList); + } + scan.setFilter(newList); return table.getResultScanner(hbaseConf, conn, scan); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java index 30d1d00..d99e5c0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import org.codehaus.jackson.JsonGenerationException; @@ -272,6 +273,7 @@ private static TimelineEntity readEntityFromFile(BufferedReader reader) Map> relatesTo, Map> isRelatedTo, Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, + TimelineFilterList confs, TimelineFilterList metrics, EnumSet fieldsToRetrieve) throws IOException { if (limit == null || limit <= 0) { limit = DEFAULT_LIMIT; @@ -386,7 +388,8 @@ public void serviceInit(Configuration conf) throws Exception { @Override public TimelineEntity getEntity(String userId, String clusterId, String flowId, Long flowRunId, String appId, String entityType, - String entityId, EnumSet fieldsToRetrieve) throws IOException { + String entityId, TimelineFilterList confs, TimelineFilterList metrics, + EnumSet fieldsToRetrieve) throws IOException { String flowRunPath = getFlowRunPath(userId, clusterId, flowId, flowRunId, appId); File dir = new File(new File(rootPath, ENTITIES_DIR), @@ -413,6 +416,7 @@ public TimelineEntity getEntity(String userId, String clusterId, Map> relatesTo, Map> isRelatedTo, Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, + TimelineFilterList confs, TimelineFilterList metrics, EnumSet fieldsToRetrieve) throws IOException { String flowRunPath = getFlowRunPath(userId, clusterId, flowId, flowRunId, appId); @@ -422,6 +426,6 @@ public TimelineEntity getEntity(String userId, String clusterId, return getEntities(dir, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, fieldsToRetrieve); + eventFilters, confs, metrics, fieldsToRetrieve); } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java index 3e32128..71dd0a1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; @@ -58,14 +59,14 @@ public FlowActivityEntityReader(String userId, String clusterId, super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, fieldsToRetrieve, true); + eventFilters, null, null, fieldsToRetrieve, true); } public FlowActivityEntityReader(String userId, String clusterId, String flowId, Long flowRunId, String appId, String entityType, String entityId, EnumSet fieldsToRetrieve) { super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, - fieldsToRetrieve); + null, null, fieldsToRetrieve); } /** @@ -96,15 +97,20 @@ protected void augmentParams(Configuration hbaseConf, Connection conn) } @Override - protected Result getResult(Configuration hbaseConf, Connection conn) - throws IOException { + protected FilterList constructFilterListBasedOnFields() { + return null; + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { throw new UnsupportedOperationException( "we don't support a single entity query"); } @Override protected ResultScanner getResults(Configuration hbaseConf, - Connection conn) throws IOException { + Connection conn, FilterList filterList) throws IOException { Scan scan = new Scan(); if (createdTimeBegin == DEFAULT_BEGIN_TIME && createdTimeEnd == DEFAULT_END_TIME) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java index c4b4e91..436ee22 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java @@ -28,12 +28,22 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; @@ -54,18 +64,20 @@ public FlowRunEntityReader(String userId, String clusterId, Map> relatesTo, Map> isRelatedTo, Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, + TimelineFilterList confs, TimelineFilterList metrics, EnumSet fieldsToRetrieve) { super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, fieldsToRetrieve, true); + eventFilters, null, metrics, fieldsToRetrieve, true); } public FlowRunEntityReader(String userId, String clusterId, String flowId, Long flowRunId, String appId, String entityType, - String entityId, EnumSet fieldsToRetrieve) { + String entityId, TimelineFilterList confs, TimelineFilterList metrics, + EnumSet fieldsToRetrieve) { super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, - fieldsToRetrieve); + null, metrics, fieldsToRetrieve); } /** @@ -101,26 +113,67 @@ protected void augmentParams(Configuration hbaseConf, Connection conn) { if (createdTimeEnd == null) { createdTimeEnd = DEFAULT_END_TIME; } + if (!fieldsToRetrieve.contains(Field.METRICS) && + metrics != null && !metrics.getFilterList().isEmpty()) { + fieldsToRetrieve.add(Field.METRICS); + } } } @Override - protected Result getResult(Configuration hbaseConf, Connection conn) - throws IOException { + protected FilterList constructFilterListBasedOnFields() { + FilterList list = new FilterList(Operator.MUST_PASS_ONE); + + // By default fetch everything in INFO column family. + FamilyFilter infoColumnFamily = + new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(FlowRunColumnFamily.INFO.getBytes())); + // Metrics not required. + if (!singleEntityRead && !fieldsToRetrieve.contains(Field.METRICS) && + !fieldsToRetrieve.contains(Field.ALL)) { + FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE); + infoColFamilyList.addFilter(infoColumnFamily); + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + FlowRunColumnPrefix.METRIC.getColumnPrefixBytes("")))); + list.addFilter(infoColFamilyList); + } + if (metrics != null && !metrics.getFilterList().isEmpty()) { + FilterList infoColFamilyList = new FilterList(); + infoColFamilyList.addFilter(infoColumnFamily); + infoColFamilyList.addFilter(TimelineFilterUtils.createHBaseFilterList( + FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(""), metrics)); + list.addFilter(infoColFamilyList); + } + return list; + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowId, flowRunId); Get get = new Get(rowKey); get.setMaxVersions(Integer.MAX_VALUE); + if (filterList != null && !filterList.getFilters().isEmpty()) { + get.setFilter(filterList); + } return table.getResult(hbaseConf, conn, get); } @Override protected ResultScanner getResults(Configuration hbaseConf, - Connection conn) throws IOException { + Connection conn, FilterList filterList) throws IOException { Scan scan = new Scan(); scan.setRowPrefixFilter( FlowRunRowKey.getRowKeyPrefix(clusterId, userId, flowId)); - scan.setFilter(new PageFilter(limit)); + FilterList newList = new FilterList(); + newList.addFilter(new PageFilter(limit)); + if (filterList != null && !filterList.getFilters().isEmpty()) { + newList.addFilter(filterList); + } + scan.setFilter(newList); return table.getResultScanner(hbaseConf, conn, scan); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java index 04fc8ee..35c45f1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java @@ -32,9 +32,18 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; @@ -46,6 +55,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; @@ -72,18 +82,20 @@ public GenericEntityReader(String userId, String clusterId, Map> relatesTo, Map> isRelatedTo, Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, + TimelineFilterList confs, TimelineFilterList metrics, EnumSet fieldsToRetrieve, boolean sortedKeys) { super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, fieldsToRetrieve, sortedKeys); + eventFilters, confs, metrics, fieldsToRetrieve, sortedKeys); } public GenericEntityReader(String userId, String clusterId, String flowId, Long flowRunId, String appId, String entityType, - String entityId, EnumSet fieldsToRetrieve) { + String entityId, TimelineFilterList confs, TimelineFilterList metrics, + EnumSet fieldsToRetrieve) { super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, - fieldsToRetrieve); + confs, metrics, fieldsToRetrieve); } /** @@ -93,6 +105,78 @@ public GenericEntityReader(String userId, String clusterId, return ENTITY_TABLE; } + @Override + protected FilterList constructFilterListBasedOnFields() { + FilterList list = new FilterList(Operator.MUST_PASS_ONE); + // Fetch all the columns. + if (fieldsToRetrieve.contains(Field.ALL) && + (confs == null || confs.getFilterList().isEmpty()) && + (metrics == null || metrics.getFilterList().isEmpty()) ) { + return list; + } + FilterList infoColFamilyList = new FilterList(); + // By default fetch everything in INFO column family. + FamilyFilter infoColumnFamily = + new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(EntityColumnFamily.INFO.getBytes())); + infoColFamilyList.addFilter(infoColumnFamily); + // Events not required. + if (!fieldsToRetrieve.contains(Field.EVENTS) && + !fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + EntityColumnPrefix.EVENT.getColumnPrefixBytes("")))); + } + // info not required. + if (!fieldsToRetrieve.contains(Field.INFO) && + !fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + EntityColumnPrefix.INFO.getColumnPrefixBytes("")))); + } + // is releated to not required. + if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) && + !fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + EntityColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes("")))); + } + // relates to not required. + if (!fieldsToRetrieve.contains(Field.RELATES_TO) && + !fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + EntityColumnPrefix.RELATES_TO.getColumnPrefixBytes("")))); + } + list.addFilter(infoColFamilyList); + if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) || + (confs != null && !confs.getFilterList().isEmpty())) { + FilterList filterCfg = + new FilterList(new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(EntityColumnFamily.CONFIGS.getBytes()))); + if (confs != null && !confs.getFilterList().isEmpty()) { + filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList(confs)); + } + list.addFilter(filterCfg); + } + if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) || + (metrics != null && !metrics.getFilterList().isEmpty())) { + FilterList filterMetrics = + new FilterList(new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(EntityColumnFamily.METRICS.getBytes()))); + if (metrics != null && !metrics.getFilterList().isEmpty()) { + filterMetrics.addFilter( + TimelineFilterUtils.createHBaseFilterList(metrics)); + } + list.addFilter(filterMetrics); + } + return list; + } + protected FlowContext lookupFlowContext(String clusterId, String appId, Configuration hbaseConf, Connection conn) throws IOException { byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId); @@ -145,6 +229,14 @@ protected void augmentParams(Configuration hbaseConf, Connection conn) if (fieldsToRetrieve == null) { fieldsToRetrieve = EnumSet.noneOf(Field.class); } + if (!fieldsToRetrieve.contains(Field.CONFIGS) && + confs != null && !confs.getFilterList().isEmpty()) { + fieldsToRetrieve.add(Field.CONFIGS); + } + if (!fieldsToRetrieve.contains(Field.METRICS) && + metrics != null && !metrics.getFilterList().isEmpty()) { + fieldsToRetrieve.add(Field.METRICS); + } if (!singleEntityRead) { if (limit == null || limit < 0) { limit = TimelineReader.DEFAULT_LIMIT; @@ -165,25 +257,31 @@ protected void augmentParams(Configuration hbaseConf, Connection conn) } @Override - protected Result getResult(Configuration hbaseConf, Connection conn) - throws IOException { + protected Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { byte[] rowKey = EntityRowKey.getRowKey(clusterId, userId, flowId, flowRunId, appId, entityType, entityId); Get get = new Get(rowKey); get.setMaxVersions(Integer.MAX_VALUE); + if (filterList != null && !filterList.getFilters().isEmpty()) { + get.setFilter(filterList); + } return table.getResult(hbaseConf, conn, get); } @Override protected ResultScanner getResults(Configuration hbaseConf, - Connection conn) throws IOException { + Connection conn, FilterList filterList) throws IOException { // Scan through part of the table to find the entities belong to one app // and one type Scan scan = new Scan(); scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix( clusterId, userId, flowId, flowRunId, appId, entityType)); scan.setMaxVersions(Integer.MAX_VALUE); + if (filterList != null && !filterList.getFilters().isEmpty()) { + scan.setFilter(filterList); + } return table.getResultScanner(hbaseConf, conn, scan); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java index 889ae19..646cbf2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; public class HBaseTimelineReaderImpl extends AbstractService implements TimelineReader { @@ -64,11 +65,13 @@ protected void serviceStop() throws Exception { @Override public TimelineEntity getEntity(String userId, String clusterId, String flowId, Long flowRunId, String appId, String entityType, - String entityId, EnumSet fieldsToRetrieve) + String entityId, TimelineFilterList confs, TimelineFilterList metrics, + EnumSet fieldsToRetrieve) throws IOException { TimelineEntityReader reader = TimelineEntityReaderFactory.createSingleEntityReader(userId, clusterId, - flowId, flowRunId, appId, entityType, entityId, fieldsToRetrieve); + flowId, flowRunId, appId, entityType, entityId, confs, metrics, + fieldsToRetrieve); return reader.readEntity(hbaseConf, conn); } @@ -80,13 +83,14 @@ public TimelineEntity getEntity(String userId, String clusterId, Map> relatesTo, Map> isRelatedTo, Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, + TimelineFilterList confs, TimelineFilterList metrics, EnumSet fieldsToRetrieve) throws IOException { TimelineEntityReader reader = TimelineEntityReaderFactory.createMultipleEntitiesReader(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, - metricFilters, eventFilters, fieldsToRetrieve); + metricFilters, eventFilters, confs, metrics, fieldsToRetrieve); return reader.readEntities(hbaseConf, conn); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java index adaf42e..0081bc6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java @@ -31,8 +31,10 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; @@ -70,6 +72,8 @@ protected Map configFilters; protected Set metricFilters; protected Set eventFilters; + protected TimelineFilterList confs; + protected TimelineFilterList metrics; /** * Main table the entity reader uses. @@ -94,6 +98,7 @@ protected TimelineEntityReader(String userId, String clusterId, Map> relatesTo, Map> isRelatedTo, Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, + TimelineFilterList confs, TimelineFilterList metrics, EnumSet fieldsToRetrieve, boolean sortedKeys) { this.singleEntityRead = false; this.sortedKeys = sortedKeys; @@ -115,6 +120,8 @@ protected TimelineEntityReader(String userId, String clusterId, this.configFilters = configFilters; this.metricFilters = metricFilters; this.eventFilters = eventFilters; + this.confs = confs; + this.metrics = metrics; this.table = getTable(); } @@ -124,7 +131,8 @@ protected TimelineEntityReader(String userId, String clusterId, */ protected TimelineEntityReader(String userId, String clusterId, String flowId, Long flowRunId, String appId, String entityType, - String entityId, EnumSet fieldsToRetrieve) { + String entityId, TimelineFilterList confs, TimelineFilterList metrics, + EnumSet fieldsToRetrieve) { this.singleEntityRead = true; this.userId = userId; this.clusterId = clusterId; @@ -134,10 +142,14 @@ protected TimelineEntityReader(String userId, String clusterId, this.entityType = entityType; this.fieldsToRetrieve = fieldsToRetrieve; this.entityId = entityId; + this.confs = confs; + this.metrics = metrics; this.table = getTable(); } + protected abstract FilterList constructFilterListBasedOnFields(); + /** * Reads and deserializes a single timeline entity from the HBase storage. */ @@ -146,7 +158,8 @@ public TimelineEntity readEntity(Configuration hbaseConf, Connection conn) validateParams(); augmentParams(hbaseConf, conn); - Result result = getResult(hbaseConf, conn); + FilterList filterList = constructFilterListBasedOnFields(); + Result result = getResult(hbaseConf, conn, filterList); if (result == null || result.isEmpty()) { // Could not find a matching row. LOG.info("Cannot find matching entity of type " + entityType); @@ -166,7 +179,8 @@ public TimelineEntity readEntity(Configuration hbaseConf, Connection conn) augmentParams(hbaseConf, conn); NavigableSet entities = new TreeSet<>(); - ResultScanner results = getResults(hbaseConf, conn); + FilterList filterList = constructFilterListBasedOnFields(); + ResultScanner results = getResults(hbaseConf, conn, filterList); try { for (Result result : results) { TimelineEntity entity = parseEntity(result); @@ -211,14 +225,14 @@ protected abstract void augmentParams(Configuration hbaseConf, * * @return the {@link Result} instance or null if no such record is found. */ - protected abstract Result getResult(Configuration hbaseConf, Connection conn) - throws IOException; + protected abstract Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException; /** * Fetches a {@link ResultScanner} for a multi-entity read. */ protected abstract ResultScanner getResults(Configuration hbaseConf, - Connection conn) throws IOException; + Connection conn, FilterList filterList) throws IOException; /** * Given a {@link Result} instance, deserializes and creates a diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java index f5341c2..16204c5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java @@ -22,6 +22,7 @@ import java.util.Set; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; /** @@ -34,22 +35,23 @@ */ public static TimelineEntityReader createSingleEntityReader(String userId, String clusterId, String flowId, Long flowRunId, String appId, - String entityType, String entityId, EnumSet fieldsToRetrieve) { + String entityType, String entityId, TimelineFilterList confs, + TimelineFilterList metrics, EnumSet fieldsToRetrieve) { // currently the types that are handled separate from the generic entity // table are application, flow run, and flow activity entities if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) { return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId, - appId, entityType, entityId, fieldsToRetrieve); + appId, entityType, entityId, confs, metrics, fieldsToRetrieve); } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) { return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId, - appId, entityType, entityId, fieldsToRetrieve); + appId, entityType, entityId, confs, metrics, fieldsToRetrieve); } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) { return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, fieldsToRetrieve); } else { // assume we're dealing with a generic entity read return new GenericEntityReader(userId, clusterId, flowId, flowRunId, - appId, entityType, entityId, fieldsToRetrieve); + appId, entityType, entityId, confs, metrics, fieldsToRetrieve); } } @@ -64,6 +66,7 @@ public static TimelineEntityReader createMultipleEntitiesReader(String userId, Map> relatesTo, Map> isRelatedTo, Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, + TimelineFilterList confs, TimelineFilterList metrics, EnumSet fieldsToRetrieve) { // currently the types that are handled separate from the generic entity // table are application, flow run, and flow activity entities @@ -71,8 +74,8 @@ public static TimelineEntityReader createMultipleEntitiesReader(String userId, return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, - infoFilters, configFilters, metricFilters, eventFilters, - fieldsToRetrieve); + infoFilters, configFilters, metricFilters, eventFilters, confs, + metrics, fieldsToRetrieve); } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) { return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, @@ -83,15 +86,15 @@ public static TimelineEntityReader createMultipleEntitiesReader(String userId, return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, - infoFilters, configFilters, metricFilters, eventFilters, - fieldsToRetrieve); + infoFilters, configFilters, metricFilters, eventFilters, confs, + metrics, fieldsToRetrieve); } else { // assume we're dealing with a generic entity read return new GenericEntityReader(userId, clusterId, flowId, flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, - infoFilters, configFilters, metricFilters, eventFilters, - fieldsToRetrieve, false); + infoFilters, configFilters, metricFilters, eventFilters, confs, + metrics, fieldsToRetrieve, false); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java index e4e305e..ac2fb18 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; /** ATSv2 reader interface. */ @Private @@ -81,6 +82,7 @@ */ TimelineEntity getEntity(String userId, String clusterId, String flowId, Long flowRunId, String appId, String entityType, String entityId, + TimelineFilterList confs, TimelineFilterList metrics, EnumSet fieldsToRetrieve) throws IOException; /** @@ -158,5 +160,6 @@ TimelineEntity getEntity(String userId, String clusterId, String flowId, Map> relatesTo, Map> isRelatedTo, Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, + TimelineFilterList confs, TimelineFilterList metrics, EnumSet fieldsToRetrieve) throws IOException; } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java index d7b5773..f5fde8e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java @@ -102,6 +102,11 @@ private String getColumnPrefix() { return columnPrefix; } + @Override + public byte[] getColumnPrefixBytes(String qualifier) { + return ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); + } + /* * (non-Javadoc) * diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java index db49098..09238cb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java @@ -107,4 +107,11 @@ public void store(byte[] rowKey, TypedBufferedMutator tableMutator, public NavigableMap> readResultsWithTimestamps(Result result) throws IOException; + /** + * @param qualifier Column qualifier corresponding to the prefix. + * @return the cell values at each respective time in for form + * {idA={timestamp1->value1}, idA={timestamp2->value2}, + * idB={timestamp3->value3}, idC={timestamp1->value4}} + */ + public byte[] getColumnPrefixBytes(String qualifier); } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java index 0d4e5a8..19f21c9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java @@ -102,6 +102,11 @@ public String getColumnPrefix() { return columnPrefix; } + @Override + public byte[] getColumnPrefixBytes(String qualifier) { + return ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); + } + /* * (non-Javadoc) * diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java index 38c0f3f..b331d44 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java @@ -83,6 +83,11 @@ public String getColumnPrefix() { return columnPrefix; } + @Override + public byte[] getColumnPrefixBytes(String qualifier) { + return ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); + } + public byte[] getColumnPrefixBytes() { return columnPrefixBytes.clone(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java index b090bba..66fd7f2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java @@ -82,6 +82,11 @@ public String getColumnPrefix() { return columnPrefix; } + @Override + public byte[] getColumnPrefixBytes(String qualifier) { + return ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); + } + public byte[] getColumnPrefixBytes() { return columnPrefixBytes.clone(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java index 4e23e49..e864d61 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java @@ -266,7 +266,7 @@ public void testGetEntityDefaultView() throws Exception { // only the id, created and modified time TimelineEntity result = reader.getEntity("user1", "cluster1", "flow1", 1L, "app1", - "app", "id_1", null); + "app", "id_1", null, null, null); Assert.assertEquals( (new TimelineEntity.Identifier("app", "id_1")).toString(), result.getIdentifier().toString()); @@ -281,7 +281,7 @@ public void testGetEntityByClusterAndApp() throws Exception { // Cluster and AppId should be enough to get an entity. TimelineEntity result = reader.getEntity(null, "cluster1", null, null, "app1", - "app", "id_1", null); + "app", "id_1", null, null, null); Assert.assertEquals( (new TimelineEntity.Identifier("app", "id_1")).toString(), result.getIdentifier().toString()); @@ -298,7 +298,7 @@ public void testAppFlowMappingCsv() throws Exception { // in app flow mapping csv has commas. TimelineEntity result = reader.getEntity(null, "cluster1", null, null, "app2", - "app", "id_5", null); + "app", "id_5", null, null, null); Assert.assertEquals( (new TimelineEntity.Identifier("app", "id_5")).toString(), result.getIdentifier().toString()); @@ -311,7 +311,7 @@ public void testGetEntityCustomFields() throws Exception { // Specified fields in addition to default view will be returned. TimelineEntity result = reader.getEntity("user1", "cluster1", "flow1", 1L, - "app1", "app", "id_1", + "app1", "app", "id_1", null, null, EnumSet.of(Field.INFO, Field.CONFIGS, Field.METRICS)); Assert.assertEquals( (new TimelineEntity.Identifier("app", "id_1")).toString(), @@ -329,8 +329,8 @@ public void testGetEntityCustomFields() throws Exception { public void testGetEntityAllFields() throws Exception { // All fields of TimelineEntity will be returned. TimelineEntity result = - reader.getEntity("user1", "cluster1", "flow1", 1L, - "app1", "app", "id_1", EnumSet.of(Field.ALL)); + reader.getEntity("user1", "cluster1", "flow1", 1L, "app1", "app", + "id_1", null, null, EnumSet.of(Field.ALL)); Assert.assertEquals( (new TimelineEntity.Identifier("app", "id_1")).toString(), result.getIdentifier().toString()); @@ -347,7 +347,7 @@ public void testGetAllEntities() throws Exception { Set result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, null, null, null, null, null, null, null, - null, null); + null, null, null, null); // All 3 entities will be returned Assert.assertEquals(4, result.size()); } @@ -357,7 +357,7 @@ public void testGetEntitiesWithLimit() throws Exception { Set result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", 2L, null, null, null, null, null, null, null, null, null, - null, null); + null, null, null, null); Assert.assertEquals(2, result.size()); // Needs to be rewritten once hashcode and equals for // TimelineEntity is implemented @@ -371,7 +371,7 @@ public void testGetEntitiesWithLimit() throws Exception { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", 3L, null, null, null, null, null, null, null, null, null, - null, null); + null, null, null, null); // Even though 2 entities out of 4 have same created time, one entity // is left out due to limit Assert.assertEquals(3, result.size()); @@ -383,7 +383,7 @@ public void testGetEntitiesByTimeWindows() throws Exception { Set result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, 1425016502030L, 1425016502060L, null, null, null, null, null, - null, null, null, null); + null, null, null, null, null, null); Assert.assertEquals(1, result.size()); // Only one entity with ID id_4 should be returned. for (TimelineEntity entity : result) { @@ -396,7 +396,7 @@ public void testGetEntitiesByTimeWindows() throws Exception { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, 1425016502010L, null, null, null, null, null, null, - null, null, null); + null, null, null, null, null); Assert.assertEquals(3, result.size()); for (TimelineEntity entity : result) { if (entity.getId().equals("id_4")) { @@ -408,7 +408,7 @@ public void testGetEntitiesByTimeWindows() throws Exception { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, 1425016502010L, null, null, null, null, null, null, null, - null, null, null); + null, null, null, null, null); Assert.assertEquals(1, result.size()); for (TimelineEntity entity : result) { if (!entity.getId().equals("id_4")) { @@ -420,7 +420,7 @@ public void testGetEntitiesByTimeWindows() throws Exception { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, 1425016502090L, 1425016503020L, null, null, null, - null, null, null, null); + null, null, null, null, null, null); Assert.assertEquals(2, result.size()); // Two entities with IDs' id_1 and id_4 should be returned. for (TimelineEntity entity : result) { @@ -433,7 +433,7 @@ public void testGetEntitiesByTimeWindows() throws Exception { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, null, 1425016502090L, null, null, null, null, - null, null, null); + null, null, null, null, null); Assert.assertEquals(2, result.size()); for (TimelineEntity entity : result) { if (entity.getId().equals("id_1") || entity.getId().equals("id_4")) { @@ -445,7 +445,7 @@ public void testGetEntitiesByTimeWindows() throws Exception { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, 1425016503005L, null, null, null, null, null, - null, null, null); + null, null, null, null, null); Assert.assertEquals(1, result.size()); for (TimelineEntity entity : result) { if (!entity.getId().equals("id_4")) { @@ -462,7 +462,7 @@ public void testGetFilteredEntities() throws Exception { Set result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, null, null, null, null, infoFilters, null, null, - null, null); + null, null, null, null); Assert.assertEquals(1, result.size()); // Only one entity with ID id_3 should be returned. for (TimelineEntity entity : result) { @@ -478,7 +478,7 @@ public void testGetFilteredEntities() throws Exception { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, null, null, null, null, null, configFilters, null, - null, null); + null, null, null, null); Assert.assertEquals(2, result.size()); for (TimelineEntity entity : result) { if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) { @@ -493,7 +493,7 @@ public void testGetFilteredEntities() throws Exception { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, null, null, null, null, null, null, null, - eventFilters, null); + eventFilters, null, null, null); Assert.assertEquals(1, result.size()); for (TimelineEntity entity : result) { if (!entity.getId().equals("id_3")) { @@ -507,7 +507,7 @@ public void testGetFilteredEntities() throws Exception { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, null, null, null, null, null, null, metricFilters, - null, null); + null, null, null, null); Assert.assertEquals(2, result.size()); // Two entities with IDs' id_1 and id_2 should be returned. for (TimelineEntity entity : result) { @@ -527,7 +527,7 @@ public void testGetEntitiesByRelations() throws Exception { Set result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, null, null, relatesTo, null, null, null, null, - null, null); + null, null, null, null); Assert.assertEquals(1, result.size()); // Only one entity with ID id_1 should be returned. for (TimelineEntity entity : result) { @@ -544,7 +544,7 @@ public void testGetEntitiesByRelations() throws Exception { result = reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app", null, null, null, null, null, null, isRelatedTo, null, null, null, - null, null); + null, null, null, null); Assert.assertEquals(2, result.size()); // Two entities with IDs' id_1 and id_3 should be returned. for (TimelineEntity entity : result) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineReaderImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineReaderImpl.java new file mode 100644 index 0000000..8fc8a64 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineReaderImpl.java @@ -0,0 +1,614 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter.CompareOp; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestHBaseTimelineReaderImpl { + + private static HBaseTestingUtility util; + private HBaseTimelineReaderImpl reader; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + util = new HBaseTestingUtility(); + Configuration conf = util.getConfiguration(); + conf.setInt("hfile.format.version", 3); + util.startMiniCluster(); + createSchema(); + loadEntities(); + loadApps(); + } + + @Before + public void init() throws Exception { + reader = new HBaseTimelineReaderImpl(); + reader.init(util.getConfiguration()); + reader.start(); + } + + @After + public void stop() throws Exception { + if (reader != null) { + reader.stop(); + reader.close(); + } + } + + private static void createSchema() throws IOException { + TimelineSchemaCreator.createAllTables(util.getConfiguration(), false); + } + + private static void loadApps() throws IOException { + TimelineEntities te = new TimelineEntities(); + TimelineEntity entity = new TimelineEntity(); + String id = "application_1111111111_2222"; + entity.setId(id); + entity.setType(TimelineEntityType.YARN_APPLICATION.toString()); + Long cTime = 1425016501000L; + Long mTime = 1425026901000L; + entity.setCreatedTime(cTime); + entity.setModifiedTime(mTime); + // add the info map in Timeline Entity + Map infoMap = new HashMap(); + infoMap.put("infoMapKey1", "infoMapValue1"); + infoMap.put("infoMapKey2", 10); + entity.addInfo(infoMap); + // add the isRelatedToEntity info + String key = "task"; + String value = "is_related_to_entity_id_here"; + Set isRelatedToSet = new HashSet(); + isRelatedToSet.add(value); + Map> isRelatedTo = new HashMap>(); + isRelatedTo.put(key, isRelatedToSet); + entity.setIsRelatedToEntities(isRelatedTo); + // add the relatesTo info + key = "container"; + value = "relates_to_entity_id_here"; + Set relatesToSet = new HashSet(); + relatesToSet.add(value); + value = "relates_to_entity_id_here_Second"; + relatesToSet.add(value); + Map> relatesTo = new HashMap>(); + relatesTo.put(key, relatesToSet); + entity.setRelatesToEntities(relatesTo); + // add some config entries + Map conf = new HashMap(); + conf.put("config_param1", "value1"); + conf.put("config_param2", "value2"); + conf.put("cfg_param1", "value3"); + entity.addConfigs(conf); + // add metrics + Set metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId("MAP_SLOT_MILLIS"); + Map metricValues = new HashMap(); + long ts = System.currentTimeMillis(); + metricValues.put(ts - 120000, 100000000); + metricValues.put(ts - 100000, 200000000); + metricValues.put(ts - 80000, 300000000); + metricValues.put(ts - 60000, 400000000); + metricValues.put(ts - 40000, 50000000000L); + metricValues.put(ts - 20000, 60000000000L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + + TimelineMetric m12 = new TimelineMetric(); + m12.setId("MAP1_BYTES"); + m12.addValue(ts, 50); + metrics.add(m12); + entity.addMetrics(metrics); + TimelineEvent event = new TimelineEvent(); + event.setId("event1"); + event.setTimestamp(ts - 2000); + entity.addEvent(event); + te.addEntity(entity); + + TimelineEntities te1 = new TimelineEntities(); + TimelineEntity entity1 = new TimelineEntity(); + String id1 = "application_1111111111_3333"; + entity1.setId(id1); + entity1.setType(TimelineEntityType.YARN_APPLICATION.toString()); + entity1.setCreatedTime(cTime); + entity1.setModifiedTime(mTime); + + // add the info map in Timeline Entity + Map infoMap1 = new HashMap(); + infoMap1.put("infoMapKey1", "infoMapValue1"); + infoMap1.put("infoMapKey2", 10); + entity1.addInfo(infoMap1); + + // add the isRelatedToEntity info + String key1 = "task"; + String value1 = "is_related_to_entity_id_here"; + Set isRelatedToSet1 = new HashSet(); + isRelatedToSet1.add(value1); + Map> isRelatedTo1 = new HashMap>(); + isRelatedTo1.put(key, isRelatedToSet1); + entity1.setIsRelatedToEntities(isRelatedTo1); + + // add the relatesTo info + key1 = "container"; + value1 = "relates_to_entity_id_here"; + Set relatesToSet1 = new HashSet(); + relatesToSet1.add(value1); + value1 = "relates_to_entity_id_here_Second"; + relatesToSet1.add(value1); + Map> relatesTo1 = new HashMap>(); + relatesTo1.put(key1, relatesToSet1); + entity1.setRelatesToEntities(relatesTo1); + + // add some config entries + Map conf1 = new HashMap(); + conf1.put("cfg_param1", "value1"); + conf1.put("cfg_param2", "value2"); + entity1.addConfigs(conf1); + + // add metrics + Set metrics1 = new HashSet<>(); + TimelineMetric m2 = new TimelineMetric(); + m2.setId("MAP1_SLOT_MILLIS"); + Map metricValues1 = new HashMap(); + long ts1 = System.currentTimeMillis(); + metricValues1.put(ts1 - 120000, 100000000); + metricValues1.put(ts1 - 100000, 200000000); + metricValues1.put(ts1 - 80000, 300000000); + metricValues1.put(ts1 - 60000, 400000000); + metricValues1.put(ts1 - 40000, 50000000000L); + metricValues1.put(ts1 - 20000, 60000000000L); + m2.setType(Type.TIME_SERIES); + m2.setValues(metricValues1); + metrics1.add(m2); + entity1.addMetrics(metrics1); + te1.addEntity(entity1); + + TimelineEntities te2 = new TimelineEntities(); + TimelineEntity entity2 = new TimelineEntity(); + String id2 = "application_1111111111_4444"; + entity2.setId(id2); + entity2.setType(TimelineEntityType.YARN_APPLICATION.toString()); + entity2.setCreatedTime(cTime); + entity2.setModifiedTime(mTime); + te2.addEntity(entity2); + HBaseTimelineWriterImpl hbi = null; + try { + hbi = new HBaseTimelineWriterImpl(util.getConfiguration()); + hbi.init(util.getConfiguration()); + hbi.start(); + String cluster = "cluster1"; + String user = "user1"; + String flow = "some_flow_name"; + String flowVersion = "AB7822C10F1111"; + long runid = 1002345678919L; + String appName = "application_1111111111_2222"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + appName = "application_1111111111_3333"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + appName = "application_1111111111_4444"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te2); + hbi.stop(); + } finally { + if (hbi != null) { + hbi.stop(); + hbi.close(); + } + } + } + + private static void loadEntities() throws IOException { + TimelineEntities te = new TimelineEntities(); + TimelineEntity entity = new TimelineEntity(); + String id = "hello"; + String type = "world"; + entity.setId(id); + entity.setType(type); + Long cTime = 1425016501000L; + Long mTime = 1425026901000L; + entity.setCreatedTime(cTime); + entity.setModifiedTime(mTime); + // add the info map in Timeline Entity + Map infoMap = new HashMap(); + infoMap.put("infoMapKey1", "infoMapValue1"); + infoMap.put("infoMapKey2", 10); + entity.addInfo(infoMap); + // add the isRelatedToEntity info + String key = "task"; + String value = "is_related_to_entity_id_here"; + Set isRelatedToSet = new HashSet(); + isRelatedToSet.add(value); + Map> isRelatedTo = new HashMap>(); + isRelatedTo.put(key, isRelatedToSet); + entity.setIsRelatedToEntities(isRelatedTo); + + // add the relatesTo info + key = "container"; + value = "relates_to_entity_id_here"; + Set relatesToSet = new HashSet(); + relatesToSet.add(value); + value = "relates_to_entity_id_here_Second"; + relatesToSet.add(value); + Map> relatesTo = new HashMap>(); + relatesTo.put(key, relatesToSet); + entity.setRelatesToEntities(relatesTo); + + // add some config entries + Map conf = new HashMap(); + conf.put("config_param1", "value1"); + conf.put("config_param2", "value2"); + conf.put("cfg_param1", "value3"); + entity.addConfigs(conf); + + // add metrics + Set metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId("MAP_SLOT_MILLIS"); + Map metricValues = new HashMap(); + long ts = System.currentTimeMillis(); + metricValues.put(ts - 120000, 100000000); + metricValues.put(ts - 100000, 200000000); + metricValues.put(ts - 80000, 300000000); + metricValues.put(ts - 60000, 400000000); + metricValues.put(ts - 40000, 50000000000L); + metricValues.put(ts - 20000, 60000000000L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + + TimelineMetric m12 = new TimelineMetric(); + m12.setId("MAP1_BYTES"); + m12.addValue(ts, 50); + metrics.add(m12); + entity.addMetrics(metrics); + te.addEntity(entity); + + TimelineEntity entity1 = new TimelineEntity(); + String id1 = "hello1"; + entity1.setId(id1); + entity1.setType(type); + entity1.setCreatedTime(cTime); + entity1.setModifiedTime(mTime); + + // add the info map in Timeline Entity + Map infoMap1 = new HashMap(); + infoMap1.put("infoMapKey1", "infoMapValue1"); + infoMap1.put("infoMapKey2", 10); + entity1.addInfo(infoMap1); + + // add the isRelatedToEntity info + String key1 = "task"; + String value1 = "is_related_to_entity_id_here"; + Set isRelatedToSet1 = new HashSet(); + isRelatedToSet1.add(value1); + Map> isRelatedTo1 = new HashMap>(); + isRelatedTo1.put(key, isRelatedToSet1); + entity1.setIsRelatedToEntities(isRelatedTo1); + + // add the relatesTo info + key1 = "container"; + value1 = "relates_to_entity_id_here"; + Set relatesToSet1 = new HashSet(); + relatesToSet1.add(value1); + value1 = "relates_to_entity_id_here_Second"; + relatesToSet1.add(value1); + Map> relatesTo1 = new HashMap>(); + relatesTo1.put(key1, relatesToSet1); + entity1.setRelatesToEntities(relatesTo1); + + // add some config entries + Map conf1 = new HashMap(); + conf1.put("cfg_param1", "value1"); + conf1.put("cfg_param2", "value2"); + entity1.addConfigs(conf1); + + // add metrics + Set metrics1 = new HashSet<>(); + TimelineMetric m2 = new TimelineMetric(); + m2.setId("MAP1_SLOT_MILLIS"); + Map metricValues1 = new HashMap(); + long ts1 = System.currentTimeMillis(); + metricValues1.put(ts1 - 120000, 100000000); + metricValues1.put(ts1 - 100000, 200000000); + metricValues1.put(ts1 - 80000, 300000000); + metricValues1.put(ts1 - 60000, 400000000); + metricValues1.put(ts1 - 40000, 50000000000L); + metricValues1.put(ts1 - 20000, 60000000000L); + m2.setType(Type.TIME_SERIES); + m2.setValues(metricValues1); + metrics1.add(m2); + entity1.addMetrics(metrics1); + te.addEntity(entity1); + + TimelineEntity entity2 = new TimelineEntity(); + String id2 = "hello2"; + entity2.setId(id2); + entity2.setType(type); + entity2.setCreatedTime(cTime); + entity2.setModifiedTime(mTime); + te.addEntity(entity2); + HBaseTimelineWriterImpl hbi = null; + try { + hbi = new HBaseTimelineWriterImpl(util.getConfiguration()); + hbi.init(util.getConfiguration()); + hbi.start(); + String cluster = "cluster1"; + String user = "user1"; + String flow = "some_flow_name"; + String flowVersion = "AB7822C10F1111"; + long runid = 1002345678919L; + String appName = "application_1111111111_1111"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.stop(); + } finally { + if (hbi != null) { + hbi.stop(); + hbi.close(); + } + } + } + + @Test + public void testReadEntities() throws Exception { + TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name", + 1002345678919L, "application_1111111111_1111","world", "hello", null, + null, EnumSet.of(Field.ALL)); + assertNotNull(e1); + assertEquals(3, e1.getConfigs().size()); + assertEquals(1, e1.getIsRelatedToEntities().size()); + Set es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, "application_1111111111_1111","world", + null, null, null, null, null, null, null, null, null, null, null, null, + null, EnumSet.of(Field.ALL)); + assertEquals(3, es1.size()); + } + + @Test + public void testReadEntitiesDefaultView() throws Exception { + TimelineEntity e1 = + reader.getEntity("user1", "cluster1", "some_flow_name", 1002345678919L, + "application_1111111111_1111","world", "hello", null, null, null); + assertNotNull(e1); + assertTrue(e1.getInfo().isEmpty() && e1.getConfigs().isEmpty() && + e1.getMetrics().isEmpty() && e1.getIsRelatedToEntities().isEmpty() && + e1.getRelatesToEntities().isEmpty()); + Set es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, "application_1111111111_1111","world", + null, null, null, null, null, null, null, null, null, null, null, null, + null, null); + assertEquals(3, es1.size()); + for (TimelineEntity e : es1) { + assertTrue(e.getInfo().isEmpty() && e.getConfigs().isEmpty() && + e.getMetrics().isEmpty() && e.getIsRelatedToEntities().isEmpty() && + e.getRelatesToEntities().isEmpty()); + } + } + + @Test + public void testReadEntitiesByFields() throws Exception { + TimelineEntity e1 = + reader.getEntity("user1", "cluster1", "some_flow_name", 1002345678919L, + "application_1111111111_1111","world", "hello", null, null, + EnumSet.of(Field.INFO, Field.CONFIGS)); + assertNotNull(e1); + assertEquals(3, e1.getConfigs().size()); + assertEquals(0, e1.getIsRelatedToEntities().size()); + Set es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, "application_1111111111_1111","world", + null, null, null, null, null, null, null, null, null, null, null, null, + null, EnumSet.of(Field.IS_RELATED_TO, Field.METRICS)); + assertEquals(3, es1.size()); + int metricsCnt = 0; + int isRelatedToCnt = 0; + int infoCnt = 0; + for (TimelineEntity entity : es1) { + metricsCnt += entity.getMetrics().size(); + isRelatedToCnt += entity.getIsRelatedToEntities().size(); + infoCnt += entity.getInfo().size(); + } + assertEquals(0, infoCnt); + assertEquals(2, isRelatedToCnt); + assertEquals(3, metricsCnt); + } + + @Test + public void testReadEntitiesConfigPrefix() throws Exception { + TimelineFilterList list = + new TimelineFilterList(Operator.OR, + new TimelinePrefixFilter(CompareOp.EQUAL, "cfg_")); + TimelineEntity e1 = + reader.getEntity("user1", "cluster1", "some_flow_name", 1002345678919L, + "application_1111111111_1111","world", "hello", list, null, null); + assertNotNull(e1); + assertEquals(1, e1.getConfigs().size()); + Set es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, "application_1111111111_1111","world", + null, null, null, null, null, null, null, null, null, null, null, + list, null, null); + int cfgCnt = 0; + for (TimelineEntity entity : es1) { + cfgCnt += entity.getConfigs().size(); + } + assertEquals(3, cfgCnt); + } + + @Test + public void testReadEntitiesMetricPrefix() throws Exception { + TimelineFilterList list = + new TimelineFilterList(Operator.OR, + new TimelinePrefixFilter(CompareOp.EQUAL, "MAP1_")); + TimelineEntity e1 = + reader.getEntity("user1", "cluster1", "some_flow_name", 1002345678919L, + "application_1111111111_1111","world", "hello", null, list, null); + assertNotNull(e1); + assertEquals(1, e1.getMetrics().size()); + Set es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, "application_1111111111_1111","world", + null, null, null, null, null, null, null, null, null, null, null, null, + list, null); + int metricCnt = 0; + for (TimelineEntity entity : es1) { + metricCnt += entity.getMetrics().size(); + } + assertEquals(2, metricCnt); + } + + @Test + public void testReadApps() throws Exception { + TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name", + 1002345678919L, "application_1111111111_2222", + TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, + EnumSet.of(Field.ALL)); + assertNotNull(e1); + assertEquals(3, e1.getConfigs().size()); + assertEquals(1, e1.getIsRelatedToEntities().size()); + Set es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, null, + TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null, + null, null, null, null, null, null, null, null, null, + EnumSet.of(Field.ALL)); + assertEquals(3, es1.size()); + } + + @Test + public void testReadAppsDefaultView() throws Exception { + TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name", + 1002345678919L, "application_1111111111_2222", + TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null); + assertNotNull(e1); + assertTrue(e1.getInfo().isEmpty() && e1.getConfigs().isEmpty() && + e1.getMetrics().isEmpty() && e1.getIsRelatedToEntities().isEmpty() && + e1.getRelatesToEntities().isEmpty()); + Set es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, null, + TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null, + null, null, null, null, null, null, null, null, null, null); + assertEquals(3, es1.size()); + for (TimelineEntity e : es1) { + assertTrue(e.getInfo().isEmpty() && e.getConfigs().isEmpty() && + e.getMetrics().isEmpty() && e.getIsRelatedToEntities().isEmpty() && + e.getRelatesToEntities().isEmpty()); + } + } + + @Test + public void testReadAppsByFields() throws Exception { + TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name", + 1002345678919L, "application_1111111111_2222", + TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, + EnumSet.of(Field.INFO, Field.CONFIGS)); + assertNotNull(e1); + assertEquals(3, e1.getConfigs().size()); + assertEquals(0, e1.getIsRelatedToEntities().size()); + Set es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, null, + TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null, + null, null, null, null, null, null, null, null, null, + EnumSet.of(Field.IS_RELATED_TO, Field.METRICS)); + assertEquals(3, es1.size()); + int metricsCnt = 0; + int isRelatedToCnt = 0; + int infoCnt = 0; + for (TimelineEntity entity : es1) { + metricsCnt += entity.getMetrics().size(); + isRelatedToCnt += entity.getIsRelatedToEntities().size(); + infoCnt += entity.getInfo().size(); + } + assertEquals(0, infoCnt); + assertEquals(2, isRelatedToCnt); + assertEquals(3, metricsCnt); + } + + @Test + public void testReadAppsConfigPrefix() throws Exception { + TimelineFilterList list = + new TimelineFilterList(Operator.OR, + new TimelinePrefixFilter(CompareOp.EQUAL, "cfg_")); + TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name", + 1002345678919L, "application_1111111111_2222", + TimelineEntityType.YARN_APPLICATION.toString(), null, list, null, null); + assertNotNull(e1); + assertEquals(1, e1.getConfigs().size()); + Set es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, null, + TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null, + null, null, null, null, null, null, null, list, null, null); + int cfgCnt = 0; + for (TimelineEntity entity : es1) { + cfgCnt += entity.getConfigs().size(); + } + assertEquals(3, cfgCnt); + } + + @Test + public void testReadAppsMetricPrefix() throws Exception { + TimelineFilterList list = + new TimelineFilterList(Operator.OR, + new TimelinePrefixFilter(CompareOp.EQUAL, "MAP1_")); + TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name", + 1002345678919L, "application_1111111111_2222", + TimelineEntityType.YARN_APPLICATION.toString(), null, null, list, null); + assertNotNull(e1); + assertEquals(1, e1.getMetrics().size()); + Set es1 = reader.getEntities("user1", "cluster1", + "some_flow_name", 1002345678919L, null, + TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null, + null, null, null, null, null, null, null, null, list, null); + int metricCnt = 0; + for (TimelineEntity entity : es1) { + metricCnt += entity.getMetrics().size(); + } + assertEquals(2, metricCnt); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java index 701615e..b17a0bd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java @@ -247,7 +247,7 @@ public void testWriteApplicationToHBase() throws Exception { // read the timeline entity using the reader this time TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appId, - entity.getType(), entity.getId(), + entity.getType(), entity.getId(), null, null, EnumSet.of(TimelineReader.Field.ALL)); assertNotNull(e1); @@ -459,11 +459,12 @@ public void testWriteEntityToHBase() throws Exception { // read the timeline entity using the reader this time TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName, - entity.getType(), entity.getId(), + entity.getType(), entity.getId(), null, null, EnumSet.of(TimelineReader.Field.ALL)); Set es1 = hbr.getEntities(user, cluster, flow, runid, appName, entity.getType(), null, null, null, null, null, null, null, - null, null, null, null, EnumSet.of(TimelineReader.Field.ALL)); + null, null, null, null, null, null, + EnumSet.of(TimelineReader.Field.ALL)); assertNotNull(e1); assertEquals(1, es1.size()); @@ -603,10 +604,10 @@ public void testEvents() throws IOException { // read the timeline entity using the reader this time TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName, - entity.getType(), entity.getId(), + entity.getType(), entity.getId(), null, null, EnumSet.of(TimelineReader.Field.ALL)); TimelineEntity e2 = hbr.getEntity(user, cluster, null, null, appName, - entity.getType(), entity.getId(), + entity.getType(), entity.getId(), null, null, EnumSet.of(TimelineReader.Field.ALL)); assertNotNull(e1); assertNotNull(e2); @@ -717,11 +718,12 @@ public void testEventsWithEmptyInfo() throws IOException { // read the timeline entity using the reader this time TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName, - entity.getType(), entity.getId(), + entity.getType(), entity.getId(), null, null, EnumSet.of(TimelineReader.Field.ALL)); Set es1 = hbr.getEntities(user, cluster, flow, runid, appName, entity.getType(), null, null, null, null, null, null, null, - null, null, null, null, EnumSet.of(TimelineReader.Field.ALL)); + null, null, null, null, null, null, + EnumSet.of(TimelineReader.Field.ALL)); assertNotNull(e1); assertEquals(1, es1.size()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java index c957dad..434adac 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java @@ -182,7 +182,7 @@ public void testWriteFlowRunMinMax() throws Exception { Set entities = hbr.getEntities(null, cluster, null, null, null, TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null, - null, null, null, null, null, null, null, null, null); + null, null, null, null, null, null, null, null, null, null, null); assertEquals(1, entities.size()); for (TimelineEntity e : entities) { FlowActivityEntity flowActivity = (FlowActivityEntity)e; @@ -238,7 +238,7 @@ public void testWriteFlowActivityOneFlow() throws Exception { Set entities = hbr.getEntities(user, cluster, flow, null, null, TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null, - null, null, null, null, null, null, null, null, null); + null, null, null, null, null, null, null, null, null, null, null); assertEquals(1, entities.size()); for (TimelineEntity e : entities) { FlowActivityEntity entity = (FlowActivityEntity)e; @@ -353,7 +353,7 @@ public void testFlowActivityTableOneFlowMultipleRunIds() throws IOException { Set entities = hbr.getEntities(null, cluster, null, null, null, TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null, - null, null, null, null, null, null, null, null, null); + null, null, null, null, null, null, null, null, null, null, null); assertEquals(1, entities.size()); for (TimelineEntity e : entities) { FlowActivityEntity flowActivity = (FlowActivityEntity)e; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index b0f83b7..b1e4fb3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -24,6 +24,7 @@ import static org.junit.Assert.fail; import java.io.IOException; +import java.util.EnumSet; import java.util.Map; import java.util.Set; @@ -43,9 +44,13 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter.CompareOp; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -177,9 +182,8 @@ public void testWriteFlowRunMinMax() throws Exception { hbr.init(c1); hbr.start(); // get the flow run entity - TimelineEntity entity = - hbr.getEntity(user, cluster, flow, runid, null, - TimelineEntityType.YARN_FLOW_RUN.toString(), null, null); + TimelineEntity entity = hbr.getEntity(user, cluster, flow, runid, null, + TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null, null); assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType())); FlowRunEntity flowRun = (FlowRunEntity)entity; assertEquals(minStartTs, flowRun.getStartTime()); @@ -237,9 +241,8 @@ public void testWriteFlowRunMetricsOneFlow() throws Exception { hbr = new HBaseTimelineReaderImpl(); hbr.init(c1); hbr.start(); - TimelineEntity entity = - hbr.getEntity(user, cluster, flow, runid, null, - TimelineEntityType.YARN_FLOW_RUN.toString(), null, null); + TimelineEntity entity = hbr.getEntity(user, cluster, flow, runid, null, + TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null, null); assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType())); Set metrics = entity.getMetrics(); assertEquals(2, metrics.size()); @@ -304,6 +307,181 @@ private void checkFlowRunTable(String cluster, String user, String flow, assertEquals(1, rowCount); } + @Test + public void testWriteFlowRunMetricsPrefix() throws Exception { + String cluster = "testWriteFlowRunMetricsOneFlow_cluster1"; + String user = "testWriteFlowRunMetricsOneFlow_user1"; + String flow = "testing_flowRun_metrics_flow_name"; + String flowVersion = "CF7022C10F1354"; + long runid = 1002345678919L; + + TimelineEntities te = new TimelineEntities(); + TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(); + te.addEntity(entityApp1); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + String appName = "application_11111111111111_1111"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + // write another application with same metric to this flow + te = new TimelineEntities(); + TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(); + te.addEntity(entityApp2); + appName = "application_11111111111111_2222"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.flush(); + } finally { + hbi.close(); + } + + // check flow run + checkFlowRunTable(cluster, user, flow, runid, c1); + + // use the timeline reader to verify data + HBaseTimelineReaderImpl hbr = null; + try { + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + TimelineFilterList metricsToRetrieve = + new TimelineFilterList(new TimelinePrefixFilter(CompareOp.EQUAL, + metric1.substring(0, metric1.indexOf("_") + 1))); + TimelineEntity entity = hbr.getEntity(user, cluster, flow, runid, null, + TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, + metricsToRetrieve, null); + assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType())); + Set metrics = entity.getMetrics(); + assertEquals(1, metrics.size()); + for (TimelineMetric metric : metrics) { + String id = metric.getId(); + Map values = metric.getValues(); + assertEquals(1, values.size()); + Number value = null; + for (Number n : values.values()) { + value = n; + } + switch (id) { + case metric1: + assertEquals(141, value); + break; + default: + fail("unrecognized metric: " + id); + } + } + + Set entities = hbr.getEntities(user, cluster, flow, runid, + null, TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null, + null, null, null, null, null, null, null, null, null, + metricsToRetrieve, null); + assertEquals(1, entities.size()); + for (TimelineEntity timelineEntity : entities) { + Set timelineMetrics = timelineEntity.getMetrics(); + assertEquals(1, timelineMetrics.size()); + for (TimelineMetric metric : timelineMetrics) { + String id = metric.getId(); + Map values = metric.getValues(); + assertEquals(1, values.size()); + Number value = null; + for (Number n : values.values()) { + value = n; + } + switch (id) { + case metric1: + assertEquals(141, value); + break; + default: + fail("unrecognized metric: " + id); + } + } + } + } finally { + hbr.close(); + } + } + + @Test + public void testWriteFlowRunsMetricFields() throws Exception { + String cluster = "testWriteFlowRunMetricsOneFlow_cluster1"; + String user = "testWriteFlowRunMetricsOneFlow_user1"; + String flow = "testing_flowRun_metrics_flow_name"; + String flowVersion = "CF7022C10F1354"; + long runid = 1002345678919L; + + TimelineEntities te = new TimelineEntities(); + TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(); + te.addEntity(entityApp1); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + String appName = "application_11111111111111_1111"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + // write another application with same metric to this flow + te = new TimelineEntities(); + TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(); + te.addEntity(entityApp2); + appName = "application_11111111111111_2222"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.flush(); + } finally { + hbi.close(); + } + + // check flow run + checkFlowRunTable(cluster, user, flow, runid, c1); + + // use the timeline reader to verify data + HBaseTimelineReaderImpl hbr = null; + try { + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + Set entities = hbr.getEntities(user, cluster, flow, runid, + null, TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null, + null, null, null, null, null, null, null, null, null, null, null); + assertEquals(1, entities.size()); + for (TimelineEntity timelineEntity : entities) { + assertEquals(0, timelineEntity.getMetrics().size()); + } + + entities = hbr.getEntities(user, cluster, flow, runid, + null, TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null, + null, null, null, null, null, null, null, null, null, + null, EnumSet.of(Field.METRICS)); + assertEquals(1, entities.size()); + for (TimelineEntity timelineEntity : entities) { + Set timelineMetrics = timelineEntity.getMetrics(); + assertEquals(2, timelineMetrics.size()); + for (TimelineMetric metric : timelineMetrics) { + String id = metric.getId(); + Map values = metric.getValues(); + assertEquals(1, values.size()); + Number value = null; + for (Number n : values.values()) { + value = n; + } + switch (id) { + case metric1: + assertEquals(141, value); + break; + case metric2: + assertEquals(57, value); + break; + default: + fail("unrecognized metric: " + id); + } + } + } + } finally { + hbr.close(); + } + } + @AfterClass public static void tearDownAfterClass() throws Exception { util.shutdownMiniCluster();