Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-29039

RowData produced by LineBytesInputFormat is reused, but DeserializationSchemaAdapter#Reader only shallow copies produced data, thus result will always be the last row value

Attach filesAttach ScreenshotAdd voteVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.15.1
    • None
    • None
    • This issue was discovered on MacOS Big Sur.

    Description

      RowData produced by LineBytesInputFormat is reused, but DeserializationSchemaAdapter#Reader only shallow copies produced data, thus result will always be the last row value.

       

      Given this program:

      package mvillalobos.bug;
      
      import org.apache.flink.api.common.RuntimeExecutionMode;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.table.api.TableResult;
      import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
      
      import static org.apache.flink.table.api.Expressions.$;
      
      public class IsThisABatchSQLBug {  
         public static void main(String[] args) {
           final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
           env.setRuntimeMode(RuntimeExecutionMode.BATCH);
           final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
           tableEnv.executeSql("CREATE TABLE historical_raw_source_template(\n" +
                 "        `file.path`              STRING NOT NULL METADATA,\n" +
                 "        `file.name`              STRING NOT NULL METADATA,\n" +
                 "        `file.size`              BIGINT NOT NULL METADATA,\n" +
                 "        `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL METADATA,\n" +
                 "        line                    STRING\n" +
                 "      ) WITH (\n" +
                 "        'connector' = 'filesystem', \n" +
                 "        'format' = 'raw'\n" +
                 "      );");
           tableEnv.executeSql("CREATE TABLE historical_raw_source\n" +
                 "      WITH (\n" +
                 "        'path' = '/Users/minmay/dev/mvillalobos/historical/data'\n" +
                 "      ) LIKE historical_raw_source_template;");     final TableResult output = tableEnv.from("historical_raw_source").select($("line")).execute();
           output.print();
        }
      } 

      and this sample.csv file in the '/Users/minmay/dev/mvillalobos/historical/data' directory:

      one
      two
      three
      four
      five
      six
      seven
      eight
      nine
      ten 

      The print results are:

      +----+--------------------------------+
      | +I |                            ten |
      | +I |                            ten |
      | +I |                            ten |
      | +I |                            ten |
      | +I |                            ten |
      | +I |                            ten |
      | +I |                            ten |
      | +I |                            ten |
      | +I |                            ten |
      | +I |                            ten |
      +----+--------------------------------+
      10 rows in set 

       

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            mvillalobos Marco A. Villalobos

            Dates

              Created:
              Updated:

              Slack

                Issue deployment