diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java index f4cd6fb..a97e95a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java @@ -19,6 +19,7 @@ import java.net.MalformedURLException; import java.net.URL; +import java.nio.charset.Charset; import java.util.Arrays; import org.apache.hadoop.conf.Configuration; @@ -37,6 +38,8 @@ private static final Logger LOG = LoggerFactory.getLogger(HBaseTimelineStorageUtils.class); + private static final Charset UTF8_ENCODING = Charset.forName("UTF-8"); + private HBaseTimelineStorageUtils() { } @@ -118,4 +121,22 @@ public static void setMetricsTimeRange(Query query, byte[] metricsCf, tsBegin, ((tsEnd == Long.MAX_VALUE) ? Long.MAX_VALUE : (tsEnd + 1))); } } + + /** + * Convert a byte array to a String. + * @param bytes bytes to convert + * @return a String + */ + public static String convertBytesToString(byte[] bytes) { + return new String(bytes, UTF8_ENCODING); + } + + /** + * Convert a string to a byte array. + * @param string the string to convert + * @return a byte array + */ + public static byte[] convertStringToBytes(String string) { + return string.getBytes(UTF8_ENCODING); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java index 7440316..42df6aa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java @@ -258,7 +258,8 @@ private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) { * @throws IOException if any problem occurs while updating filter list. */ private void updateFilterForConfsAndMetricsToRetrieve( - FilterList listBasedOnFields) throws IOException { + FilterList listBasedOnFields, Set cfsInFields) + throws IOException { TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); // Please note that if confsToRetrieve is specified, we would have added // CONFS to fields to retrieve in augmentParams() even if not specified. @@ -268,6 +269,9 @@ private void updateFilterForConfsAndMetricsToRetrieve( createFilterForConfsOrMetricsToRetrieve( dataToRetrieve.getConfsToRetrieve(), ApplicationColumnFamily.CONFIGS, ApplicationColumnPrefix.CONFIG)); + cfsInFields.add( + HBaseTimelineStorageUtils.convertBytesToString( + ApplicationColumnFamily.CONFIGS.getBytes())); } // Please note that if metricsToRetrieve is specified, we would have added @@ -278,11 +282,15 @@ private void updateFilterForConfsAndMetricsToRetrieve( createFilterForConfsOrMetricsToRetrieve( dataToRetrieve.getMetricsToRetrieve(), ApplicationColumnFamily.METRICS, ApplicationColumnPrefix.METRIC)); + cfsInFields.add( + HBaseTimelineStorageUtils.convertBytesToString( + ApplicationColumnFamily.METRICS.getBytes())); } } @Override - protected FilterList constructFilterListBasedOnFields() throws IOException { + protected FilterList constructFilterListBasedOnFields(Set cfsInFields) + throws IOException { if (!needCreateFilterListBasedOnFields()) { // Fetch all the columns. No need of a filter. return null; @@ -303,8 +311,11 @@ protected FilterList constructFilterListBasedOnFields() throws IOException { excludeFieldsFromInfoColFamily(infoColFamilyList); } listBasedOnFields.addFilter(infoColFamilyList); + cfsInFields.add( + HBaseTimelineStorageUtils.convertBytesToString( + ApplicationColumnFamily.INFO.getBytes())); - updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields); + updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields, cfsInFields); return listBasedOnFields; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java index d0a0f3b..7b7eef5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; @@ -97,7 +98,8 @@ protected FilterList constructFilterListBasedOnFilters() throws IOException { } @Override - protected FilterList constructFilterListBasedOnFields() { + protected FilterList constructFilterListBasedOnFields( + Set cfsInFields) { return null; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java index 33a2cf6..68e8dd4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.EnumSet; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; @@ -152,7 +153,8 @@ private FilterList updateFixedColumns() { } @Override - protected FilterList constructFilterListBasedOnFields() throws IOException { + protected FilterList constructFilterListBasedOnFields( + Set cfsInFields) throws IOException { FilterList list = new FilterList(Operator.MUST_PASS_ONE); // By default fetch everything in INFO column family. FamilyFilter infoColumnFamily = @@ -166,6 +168,9 @@ protected FilterList constructFilterListBasedOnFields() throws IOException { && !hasField(dataToRetrieve.getFieldsToRetrieve(), Field.METRICS)) { FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE); infoColFamilyList.addFilter(infoColumnFamily); + cfsInFields.add( + HBaseTimelineStorageUtils.convertBytesToString( + FlowRunColumnFamily.INFO.getBytes())); infoColFamilyList.addFilter(new QualifierFilter(CompareOp.NOT_EQUAL, new BinaryPrefixComparator(FlowRunColumnPrefix.METRIC .getColumnPrefixBytes("")))); @@ -182,6 +187,9 @@ protected FilterList constructFilterListBasedOnFields() throws IOException { && !metricsToRetrieve.getFilterList().isEmpty()) { FilterList infoColFamilyList = new FilterList(); infoColFamilyList.addFilter(infoColumnFamily); + cfsInFields.add( + HBaseTimelineStorageUtils.convertBytesToString( + FlowRunColumnFamily.INFO.getBytes())); FilterList columnsList = updateFixedColumns(); columnsList.addFilter(TimelineFilterUtils.createHBaseFilterList( FlowRunColumnPrefix.METRIC, metricsToRetrieve)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java index 02eca84..1d196e3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java @@ -349,7 +349,8 @@ private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) { * @throws IOException if any problem occurs while updating filter list. */ private void updateFilterForConfsAndMetricsToRetrieve( - FilterList listBasedOnFields) throws IOException { + FilterList listBasedOnFields, Set cfsInFields) + throws IOException { TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); // Please note that if confsToRetrieve is specified, we would have added // CONFS to fields to retrieve in augmentParams() even if not specified. @@ -359,6 +360,9 @@ private void updateFilterForConfsAndMetricsToRetrieve( .createFilterForConfsOrMetricsToRetrieve( dataToRetrieve.getConfsToRetrieve(), EntityColumnFamily.CONFIGS, EntityColumnPrefix.CONFIG)); + cfsInFields.add( + HBaseTimelineStorageUtils.convertBytesToString( + EntityColumnFamily.CONFIGS.getBytes())); } // Please note that if metricsToRetrieve is specified, we would have added @@ -369,11 +373,15 @@ private void updateFilterForConfsAndMetricsToRetrieve( .createFilterForConfsOrMetricsToRetrieve( dataToRetrieve.getMetricsToRetrieve(), EntityColumnFamily.METRICS, EntityColumnPrefix.METRIC)); + cfsInFields.add( + HBaseTimelineStorageUtils.convertBytesToString( + EntityColumnFamily.METRICS.getBytes())); } } @Override - protected FilterList constructFilterListBasedOnFields() throws IOException { + protected FilterList constructFilterListBasedOnFields(Set cfsInFields) + throws IOException { if (!needCreateFilterListBasedOnFields()) { // Fetch all the columns. No need of a filter. return null; @@ -394,7 +402,10 @@ protected FilterList constructFilterListBasedOnFields() throws IOException { excludeFieldsFromInfoColFamily(infoColFamilyList); } listBasedOnFields.addFilter(infoColFamilyList); - updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields); + cfsInFields.add( + HBaseTimelineStorageUtils.convertBytesToString( + EntityColumnFamily.INFO.getBytes())); + updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields, cfsInFields); return listBasedOnFields; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/SubApplicationEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/SubApplicationEntityReader.java index faed348..c0fb767 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/SubApplicationEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/SubApplicationEntityReader.java @@ -247,7 +247,8 @@ private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) { * @throws IOException if any problem occurs while updating filter list. */ private void updateFilterForConfsAndMetricsToRetrieve( - FilterList listBasedOnFields) throws IOException { + FilterList listBasedOnFields, Set cfsInFields) + throws IOException { TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); // Please note that if confsToRetrieve is specified, we would have added // CONFS to fields to retrieve in augmentParams() even if not specified. @@ -258,6 +259,9 @@ private void updateFilterForConfsAndMetricsToRetrieve( dataToRetrieve.getConfsToRetrieve(), SubApplicationColumnFamily.CONFIGS, SubApplicationColumnPrefix.CONFIG)); + cfsInFields.add( + HBaseTimelineStorageUtils.convertBytesToString( + SubApplicationColumnFamily.CONFIGS.getBytes())); } // Please note that if metricsToRetrieve is specified, we would have added @@ -269,11 +273,15 @@ private void updateFilterForConfsAndMetricsToRetrieve( dataToRetrieve.getMetricsToRetrieve(), SubApplicationColumnFamily.METRICS, SubApplicationColumnPrefix.METRIC)); + cfsInFields.add( + HBaseTimelineStorageUtils.convertBytesToString( + SubApplicationColumnFamily.METRICS.getBytes())); } } @Override - protected FilterList constructFilterListBasedOnFields() throws IOException { + protected FilterList constructFilterListBasedOnFields(Set cfsInFields) + throws IOException { if (!needCreateFilterListBasedOnFields()) { // Fetch all the columns. No need of a filter. return null; @@ -293,7 +301,10 @@ protected FilterList constructFilterListBasedOnFields() throws IOException { excludeFieldsFromInfoColFamily(infoColFamilyList); } listBasedOnFields.addFilter(infoColFamilyList); - updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields); + cfsInFields.add( + HBaseTimelineStorageUtils.convertBytesToString( + SubApplicationColumnFamily.INFO.getBytes())); + updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields, cfsInFields); return listBasedOnFields; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java index 3168163..f8db278 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java @@ -30,11 +30,15 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; @@ -49,6 +53,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnRWHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName; import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; @@ -122,11 +127,12 @@ protected TimelineEntityReader(TimelineReaderContext ctxt, * results fetched from HBase back-end storage. This is called only for * multiple entity reads. * + * @param cfsInFields column families in the fields * @return a {@link FilterList} object. * @throws IOException if any problem occurs while creating filter list. */ - protected abstract FilterList constructFilterListBasedOnFields() - throws IOException; + protected abstract FilterList constructFilterListBasedOnFields( + Set cfsInFields) throws IOException; /** * Creates a {@link FilterList} based on info, config and metric filters. This @@ -151,7 +157,9 @@ private FilterList createFilterList() throws IOException { FilterList listBasedOnFilters = constructFilterListBasedOnFilters(); boolean hasListBasedOnFilters = listBasedOnFilters != null && !listBasedOnFilters.getFilters().isEmpty(); - FilterList listBasedOnFields = constructFilterListBasedOnFields(); + Set cfsInListBasedOnFields = new HashSet<>(0); + FilterList listBasedOnFields = + constructFilterListBasedOnFields(cfsInListBasedOnFields); boolean hasListBasedOnFields = listBasedOnFields != null && !listBasedOnFields.getFilters().isEmpty(); // If filter lists based on both filters and fields can be created, @@ -164,6 +172,22 @@ private FilterList createFilterList() throws IOException { if (hasListBasedOnFilters && hasListBasedOnFields) { FilterList list = new FilterList(); list.addFilter(listBasedOnFilters); + + Set cfsInListBasedOnFilters = new HashSet<>(0); + extractColumnFamiliesFromFiltersBasedOnFilters( + listBasedOnFilters, cfsInListBasedOnFilters); + + // must exclude cfs that are already covered in fields-based filters + // otherwise we will return the whole cf + cfsInListBasedOnFilters.removeAll(cfsInListBasedOnFields); + + if (!cfsInListBasedOnFilters.isEmpty()) { + for (String cf: cfsInListBasedOnFilters) { + listBasedOnFields.addFilter(new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator( + HBaseTimelineStorageUtils.convertStringToBytes(cf)))); + } + } list.addFilter(listBasedOnFields); return list; } else if (hasListBasedOnFilters) { @@ -174,6 +198,21 @@ private FilterList createFilterList() throws IOException { return null; } + private static void extractColumnFamiliesFromFiltersBasedOnFilters( + Filter hbaseFilterBasedOnTLSFilter, Set columnFamilies) { + if (hbaseFilterBasedOnTLSFilter instanceof SingleColumnValueFilter) { + byte[] cf = ((SingleColumnValueFilter) + hbaseFilterBasedOnTLSFilter).getFamily(); + columnFamilies.add(HBaseTimelineStorageUtils.convertBytesToString(cf)); + } else if (hbaseFilterBasedOnTLSFilter instanceof FilterList) { + FilterList filterListBase = (FilterList) hbaseFilterBasedOnTLSFilter; + for (Filter fs: filterListBase.getFilters()) { + extractColumnFamiliesFromFiltersBasedOnFilters(fs, columnFamilies); + } + } + } + + protected TimelineDataToRetrieve getDataToRetrieve() { return dataToRetrieve; } @@ -206,7 +245,7 @@ public TimelineEntity readEntity(Configuration hbaseConf, Connection conn) validateParams(); augmentParams(hbaseConf, conn); - FilterList filterList = constructFilterListBasedOnFields(); + FilterList filterList = constructFilterListBasedOnFields(new HashSet<>(0)); if (LOG.isDebugEnabled() && filterList != null) { LOG.debug("FilterList created for get is - " + filterList); }