Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-30899

FileSystemTableSource with CSV format incorrectly selects fields if filtering for partition

    XMLWordPrintableJSON

Details

    Description

      In my testing it only affected csv and testcsv formats.

       

      I think it's caused by `FileSystemTableSource` calling `DeserializationFormatFactory#createRuntimeDecoder` with wrong `physicalDataType`. The files won't contain the partitioned field values, but in case of a projection pushdown (which can happen during planning phase if we filter the partition field by a constant value) the final `physicalDataType` passed to the deserializer by `FileSystemTableSource` will contain the partitioned fields as well. As described in `DecodingFormat`, every field in the `physicalDataType` parameter will have to be present in the serialized record.

       

      Example:

      CREATE TABLE test_table (
        f0 INT,
        f1 INT,
        f2 INT,
        f3 INT
      ) PARTITIONED BY (f0,f1) WITH (
        'connector' = 'filesystem',
        'path' = 'file:///path/to/whatever',
        'format' = 'csv'
      )
      
      SELECT * FROM test_table WHERE f0 = 1;
      -- !!!! should be 1,4,7,10 !!!! 
      +-------------+-------------+-------------+-------------+
      |          f0 |          f1 |          f2 |          f3 |
      +-------------+-------------+-------------+-------------+
      |           1 |           4 |          10 |           0 |
      +-------------+-------------+-------------+-------------+
      
      SELECT * FROM test_table;
      +-------------+-------------+-------------+-------------+
      |          f0 |          f1 |          f2 |          f3 |
      +-------------+-------------+-------------+-------------+
      |           2 |           5 |           8 |          11 |
      |           1 |           4 |           7 |          10 |
      |           3 |           6 |           9 |          12 |
      +-------------+-------------+-------------+-------------+
      
      SELECT * FROM test_table WHERE f0>0;
      +-------------+-------------+-------------+-------------+
      |          f0 |          f1 |          f2 |          f3 |
      +-------------+-------------+-------------+-------------+
      |           1 |           4 |           7 |          10 |
      |           3 |           6 |           9 |          12 |
      |           2 |           5 |           8 |          11 |
      +-------------+-------------+-------------+-------------+
      
      SELECT * FROM test_table WHERE f0 = 1 AND f1 = 4;
      ...
      Caused by: java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 4
          at org.apache.flink.types.parser.IntParser.parseField(IntParser.java:49)
          at org.apache.flink.types.parser.IntParser.parseField(IntParser.java:27)
          at org.apache.flink.types.parser.FieldParser.resetErrorStateAndParse(FieldParser.java:101)
          at org.apache.flink.formats.testcsv.TestCsvDeserializationSchema.deserialize(TestCsvDeserializationSchema.java:92)
          at org.apache.flink.formats.testcsv.TestCsvDeserializationSchema.deserialize(TestCsvDeserializationSchema.java:42)
          at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
      ... 

      At https://github.com/apache/flink/blob/b1e70aebd3e248d68cf41a43db385ec9c9b6235a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java#L147 the `physicalRowDataType` will contain the partition fields as well, but `partitionKeysToExtract` will not contain it since `producedDataType` has been modified in the `applyProjection` method, so it will result in an empty projection. Then on line 154 we construct the final `physicalDataType`, but since `partitionKeysProjections` is empty, it will result with the same value as `physicalDataType` which contains the partition fields too.

      By changing

       final Projection partitionKeysProjections = Projection.fromFieldNames(physicalDataType, partitionKeysToExtract);

      to

       final Projection partitionKeysProjections = Projection.fromFieldNames(physicalDataType, partitionKeys);

      the issue can be solved. I have verified this solution with 1 and 2 partition keys, with and without metadata columns, with and without virtual columns. But I still need to test this change with other formats.

       

      If this solution seems correct and a committer could assign me to the JIRA I can start working on it

      Attachments

        Activity

          People

            mateczagany Mate Czagany
            mateczagany Mate Czagany
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: