Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-12399

FilterableTableSource does not use filters on job run

    XMLWordPrintableJSON

    Details

      Description

      As discussed on the mailing list, there appears to be a bug where a job that uses a custom FilterableTableSource does not keep the filters that were pushed down into the table source. More specifically, the table source does receive filters via applyPredicates, and a new table source with those filters is returned, but the final job graph appears to use the original table source, which does not contain any filters.

      I attached a minimal example program to this ticket. The custom table source is as follows:

      public class CustomTableSource implements BatchTableSource<Model>, FilterableTableSource<Model> {
      
          private static final Logger LOG = LoggerFactory.getLogger(CustomTableSource.class);
      
          private final Filter[] filters;
      
          private final FilterConverter converter = new FilterConverter();
      
          public CustomTableSource() {
              this(null);
          }
      
          private CustomTableSource(Filter[] filters) {
              this.filters = filters;
          }
      
          @Override
          public DataSet<Model> getDataSet(ExecutionEnvironment execEnv) {
              if (filters == null) {
                 LOG.info("==== No filters defined ====");
              } else {
                  LOG.info("==== Found filters ====");
                  for (Filter filter : filters) {
                      LOG.info("FILTER: {}", filter);
                  }
              }
      
              return execEnv.fromCollection(allModels());
          }
      
          @Override
          public TableSource<Model> applyPredicate(List<Expression> predicates) {
              LOG.info("Applying predicates");
      
              List<Filter> acceptedFilters = new ArrayList<>();
              for (final Expression predicate : predicates) {
                  converter.convert(predicate).ifPresent(acceptedFilters::add);
              }
      
              return new CustomTableSource(acceptedFilters.toArray(new Filter[0]));
          }
      
          @Override
          public boolean isFilterPushedDown() {
              return filters != null;
          }
      
          @Override
          public TypeInformation<Model> getReturnType() {
              return TypeInformation.of(Model.class);
          }
      
          @Override
          public TableSchema getTableSchema() {
              return TableSchema.fromTypeInfo(getReturnType());
          }
      
          private List<Model> allModels() {
              List<Model> models = new ArrayList<>();
      
              models.add(new Model(1, 2, 3, 4));
              models.add(new Model(10, 11, 12, 13));
              models.add(new Model(20, 21, 22, 23));
      
              return models;
          }
      }
      

       

      When run, it logs

      15:24:54,888 INFO  com.klaviyo.filterbug.CustomTableSource                       - Applying predicates
      15:24:54,901 INFO  com.klaviyo.filterbug.CustomTableSource                       - Applying predicates
      15:24:54,910 INFO  com.klaviyo.filterbug.CustomTableSource                       - Applying predicates
      15:24:54,977 INFO  com.klaviyo.filterbug.CustomTableSource                       - ==== No filters defined ====

      which appears to indicate that although filters are getting pushed down, the final job does not use them.

        Attachments

        1. flink-filter-bug.tar.gz
          3 kB
          Josh Bradt

          Issue Links

            Activity

              People

              • Assignee:
                rongr Rong Rong
                Reporter:
                josh.bradt Josh Bradt
              • Votes:
                1 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 40m
                  40m