diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java index 7440316..a82c5d8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader; import java.io.IOException; +import java.nio.charset.Charset; import java.util.EnumSet; import java.util.Set; @@ -258,7 +259,8 @@ private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) { * @throws IOException if any problem occurs while updating filter list. */ private void updateFilterForConfsAndMetricsToRetrieve( - FilterList listBasedOnFields) throws IOException { + FilterList listBasedOnFields, Set cfsInFields) + throws IOException { TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); // Please note that if confsToRetrieve is specified, we would have added // CONFS to fields to retrieve in augmentParams() even if not specified. @@ -268,6 +270,9 @@ private void updateFilterForConfsAndMetricsToRetrieve( createFilterForConfsOrMetricsToRetrieve( dataToRetrieve.getConfsToRetrieve(), ApplicationColumnFamily.CONFIGS, ApplicationColumnPrefix.CONFIG)); + cfsInFields.add( + new String(ApplicationColumnFamily.CONFIGS.getBytes(), + Charset.forName("UTF-8"))); } // Please note that if metricsToRetrieve is specified, we would have added @@ -278,11 +283,15 @@ private void updateFilterForConfsAndMetricsToRetrieve( createFilterForConfsOrMetricsToRetrieve( dataToRetrieve.getMetricsToRetrieve(), ApplicationColumnFamily.METRICS, ApplicationColumnPrefix.METRIC)); + cfsInFields.add( + new String(ApplicationColumnFamily.METRICS.getBytes(), + Charset.forName("UTF-8"))); } } @Override - protected FilterList constructFilterListBasedOnFields() throws IOException { + protected FilterList constructFilterListBasedOnFields(Set cfsInFields) + throws IOException { if (!needCreateFilterListBasedOnFields()) { // Fetch all the columns. No need of a filter. return null; @@ -303,8 +312,11 @@ protected FilterList constructFilterListBasedOnFields() throws IOException { excludeFieldsFromInfoColFamily(infoColFamilyList); } listBasedOnFields.addFilter(infoColFamilyList); + cfsInFields.add( + new String(ApplicationColumnFamily.INFO.getBytes(), + Charset.forName("UTF-8"))); - updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields); + updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields, cfsInFields); return listBasedOnFields; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java index d0a0f3b..7b7eef5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; @@ -97,7 +98,8 @@ protected FilterList constructFilterListBasedOnFilters() throws IOException { } @Override - protected FilterList constructFilterListBasedOnFields() { + protected FilterList constructFilterListBasedOnFields( + Set cfsInFields) { return null; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java index 33a2cf6..0d2b3e6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java @@ -18,7 +18,9 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader; import java.io.IOException; +import java.nio.charset.Charset; import java.util.EnumSet; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; @@ -152,7 +154,8 @@ private FilterList updateFixedColumns() { } @Override - protected FilterList constructFilterListBasedOnFields() throws IOException { + protected FilterList constructFilterListBasedOnFields( + Set cfsInFields) throws IOException { FilterList list = new FilterList(Operator.MUST_PASS_ONE); // By default fetch everything in INFO column family. FamilyFilter infoColumnFamily = @@ -166,6 +169,9 @@ protected FilterList constructFilterListBasedOnFields() throws IOException { && !hasField(dataToRetrieve.getFieldsToRetrieve(), Field.METRICS)) { FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE); infoColFamilyList.addFilter(infoColumnFamily); + cfsInFields.add( + new String(FlowRunColumnFamily.INFO.getBytes(), + Charset.forName("UTF-8"))); infoColFamilyList.addFilter(new QualifierFilter(CompareOp.NOT_EQUAL, new BinaryPrefixComparator(FlowRunColumnPrefix.METRIC .getColumnPrefixBytes("")))); @@ -182,6 +188,9 @@ protected FilterList constructFilterListBasedOnFields() throws IOException { && !metricsToRetrieve.getFilterList().isEmpty()) { FilterList infoColFamilyList = new FilterList(); infoColFamilyList.addFilter(infoColumnFamily); + cfsInFields.add( + new String(FlowRunColumnFamily.INFO.getBytes(), + Charset.forName("UTF-8"))); FilterList columnsList = updateFixedColumns(); columnsList.addFilter(TimelineFilterUtils.createHBaseFilterList( FlowRunColumnPrefix.METRIC, metricsToRetrieve)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java index 02eca84..ee95c40 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader; import java.io.IOException; +import java.nio.charset.Charset; import java.util.EnumSet; import java.util.Iterator; import java.util.Map; @@ -349,7 +350,8 @@ private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) { * @throws IOException if any problem occurs while updating filter list. */ private void updateFilterForConfsAndMetricsToRetrieve( - FilterList listBasedOnFields) throws IOException { + FilterList listBasedOnFields, Set cfsInFields) + throws IOException { TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); // Please note that if confsToRetrieve is specified, we would have added // CONFS to fields to retrieve in augmentParams() even if not specified. @@ -359,6 +361,9 @@ private void updateFilterForConfsAndMetricsToRetrieve( .createFilterForConfsOrMetricsToRetrieve( dataToRetrieve.getConfsToRetrieve(), EntityColumnFamily.CONFIGS, EntityColumnPrefix.CONFIG)); + cfsInFields.add( + new String(EntityColumnFamily.CONFIGS.getBytes(), + Charset.forName("UTF-8"))); } // Please note that if metricsToRetrieve is specified, we would have added @@ -369,11 +374,15 @@ private void updateFilterForConfsAndMetricsToRetrieve( .createFilterForConfsOrMetricsToRetrieve( dataToRetrieve.getMetricsToRetrieve(), EntityColumnFamily.METRICS, EntityColumnPrefix.METRIC)); + cfsInFields.add( + new String(EntityColumnFamily.METRICS.getBytes(), + Charset.forName("UTF-8"))); } } @Override - protected FilterList constructFilterListBasedOnFields() throws IOException { + protected FilterList constructFilterListBasedOnFields(Set cfsInFields) + throws IOException { if (!needCreateFilterListBasedOnFields()) { // Fetch all the columns. No need of a filter. return null; @@ -394,7 +403,10 @@ protected FilterList constructFilterListBasedOnFields() throws IOException { excludeFieldsFromInfoColFamily(infoColFamilyList); } listBasedOnFields.addFilter(infoColFamilyList); - updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields); + cfsInFields.add( + new String(EntityColumnFamily.INFO.getBytes(), + Charset.forName("UTF-8"))); + updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields, cfsInFields); return listBasedOnFields; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/SubApplicationEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/SubApplicationEntityReader.java index faed348..3fe7c80 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/SubApplicationEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/SubApplicationEntityReader.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader; import java.io.IOException; +import java.nio.charset.Charset; import java.util.EnumSet; import java.util.Set; @@ -247,7 +248,8 @@ private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) { * @throws IOException if any problem occurs while updating filter list. */ private void updateFilterForConfsAndMetricsToRetrieve( - FilterList listBasedOnFields) throws IOException { + FilterList listBasedOnFields, Set cfsInFields) + throws IOException { TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); // Please note that if confsToRetrieve is specified, we would have added // CONFS to fields to retrieve in augmentParams() even if not specified. @@ -258,6 +260,9 @@ private void updateFilterForConfsAndMetricsToRetrieve( dataToRetrieve.getConfsToRetrieve(), SubApplicationColumnFamily.CONFIGS, SubApplicationColumnPrefix.CONFIG)); + cfsInFields.add( + new String(SubApplicationColumnFamily.CONFIGS.getBytes(), + Charset.forName("UTF-8"))); } // Please note that if metricsToRetrieve is specified, we would have added @@ -269,11 +274,15 @@ private void updateFilterForConfsAndMetricsToRetrieve( dataToRetrieve.getMetricsToRetrieve(), SubApplicationColumnFamily.METRICS, SubApplicationColumnPrefix.METRIC)); + cfsInFields.add( + new String(SubApplicationColumnFamily.METRICS.getBytes(), + Charset.forName("UTF-8"))); } } @Override - protected FilterList constructFilterListBasedOnFields() throws IOException { + protected FilterList constructFilterListBasedOnFields(Set cfsInFields) + throws IOException { if (!needCreateFilterListBasedOnFields()) { // Fetch all the columns. No need of a filter. return null; @@ -293,7 +302,10 @@ protected FilterList constructFilterListBasedOnFields() throws IOException { excludeFieldsFromInfoColFamily(infoColFamilyList); } listBasedOnFields.addFilter(infoColFamilyList); - updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields); + cfsInFields.add( + new String(SubApplicationColumnFamily.INFO.getBytes(), + Charset.forName("UTF-8"))); + updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields, cfsInFields); return listBasedOnFields; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java index 3168163..98609cf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader; import java.io.IOException; +import java.nio.charset.Charset; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -30,11 +31,16 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; @@ -122,11 +128,12 @@ protected TimelineEntityReader(TimelineReaderContext ctxt, * results fetched from HBase back-end storage. This is called only for * multiple entity reads. * + * @param cfsInFields column families in the fields * @return a {@link FilterList} object. * @throws IOException if any problem occurs while creating filter list. */ - protected abstract FilterList constructFilterListBasedOnFields() - throws IOException; + protected abstract FilterList constructFilterListBasedOnFields( + Set cfsInFields) throws IOException; /** * Creates a {@link FilterList} based on info, config and metric filters. This @@ -151,7 +158,9 @@ private FilterList createFilterList() throws IOException { FilterList listBasedOnFilters = constructFilterListBasedOnFilters(); boolean hasListBasedOnFilters = listBasedOnFilters != null && !listBasedOnFilters.getFilters().isEmpty(); - FilterList listBasedOnFields = constructFilterListBasedOnFields(); + Set cfsInListBasedOnFields = new HashSet<>(0); + FilterList listBasedOnFields = + constructFilterListBasedOnFields(cfsInListBasedOnFields); boolean hasListBasedOnFields = listBasedOnFields != null && !listBasedOnFields.getFilters().isEmpty(); // If filter lists based on both filters and fields can be created, @@ -164,6 +173,21 @@ private FilterList createFilterList() throws IOException { if (hasListBasedOnFilters && hasListBasedOnFields) { FilterList list = new FilterList(); list.addFilter(listBasedOnFilters); + + Set cfsInListBasedOnFilters = new HashSet<>(0); + extractColumnFamiliesFromFiltersBasedOnFilters( + listBasedOnFilters, cfsInListBasedOnFilters); + + // must exclude cfs that are already covered in fields-based filters + // otherwise we will return the whole cf + cfsInListBasedOnFilters.removeAll(cfsInListBasedOnFields); + + if (!cfsInListBasedOnFilters.isEmpty()) { + for (String cf: cfsInListBasedOnFilters) { + listBasedOnFields.addFilter(new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(Bytes.toBytes(cf)))); + } + } list.addFilter(listBasedOnFields); return list; } else if (hasListBasedOnFilters) { @@ -174,6 +198,25 @@ private FilterList createFilterList() throws IOException { return null; } + private static void extractColumnFamiliesFromFiltersBasedOnFilters( + Filter filter, Set columnFamilies) { + if (filter instanceof SingleColumnValueFilter) { + byte[] cf = ((SingleColumnValueFilter) filter).getFamily(); + columnFamilies.add(new String(cf, Charset.forName("UTF-8"))); + } else if (filter instanceof FilterList) { + FilterList filterListBase = (FilterList) filter; + for (Filter fs: filterListBase.getFilters()) { + extractColumnFamiliesFromFiltersBasedOnFilters(fs, columnFamilies); + } + } else if (filter instanceof FilterList) { + FilterList filterList = (FilterList) filter; + for (Filter fs: filterList.getFilters()) { + extractColumnFamiliesFromFiltersBasedOnFilters(fs, columnFamilies); + } + } + } + + protected TimelineDataToRetrieve getDataToRetrieve() { return dataToRetrieve; } @@ -206,7 +249,7 @@ public TimelineEntity readEntity(Configuration hbaseConf, Connection conn) validateParams(); augmentParams(hbaseConf, conn); - FilterList filterList = constructFilterListBasedOnFields(); + FilterList filterList = constructFilterListBasedOnFields(new HashSet<>(0)); if (LOG.isDebugEnabled() && filterList != null) { LOG.debug("FilterList created for get is - " + filterList); }