Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-30514

HybridSource savepoint recovery sequence

    XMLWordPrintableJSON

Details

    Description

      org.apache.flink.connector.base.source.hybrid.HybridSourceReader accumulates splits during recovery in

      org.apache.flink.connector.base.source.hybrid.HybridSourceReader#restoredSplits.

      As a next step it creates a reader and pushes all recoveredSplits to the reader.
      org.apache.flink.connector.base.source.hybrid.HybridSourceReader#setCurrentReader

      Instantiation sequence of the setCurrentReader is following

      • reader.start()
      • reader.addSplits()

      Seems like it doesn't work if we use FileSourceReader as an underlying reader.

      FileSourceReader#start() method checks if any splits are available to read and executes sendSplitRequest if empty. Current HybridSourceReader recovery sequence is not aligned with this.

      So, every time we recover we immediately jump to the next splits.
      Let me show you some logs. In this experiment job should have started with files inside the 1000000 bucket but jumped to the bucket number 2000000
      Job Manager

      2022-12-27 13:38:47.155 StaticFileSplitEnumerator  - Assigned split to subtask 1 : FileSourceSplit: s3a://bucket/2000000/part-00001-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97489087)  hosts=[localhost] ID=0000000032 position=null
      2022-12-27 13:38:47.156 StaticFileSplitEnumerator  - Assigned split to subtask 9 : FileSourceSplit: s3a://bucket/2000000/part-00002-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97342071)  hosts=[localhost] ID=0000000033 position=null
      2022-12-27 13:38:47.156 StaticFileSplitEnumerator  - Assigned split to subtask 6 : FileSourceSplit: s3a://bucket/2000000/part-00000-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97377047)  hosts=[localhost] ID=0000000031 position=null
      2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 5 : FileSourceSplit: s3a://bucket/2000000/part-00003-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97406878)  hosts=[localhost] ID=0000000034 position=null
      2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 2 : FileSourceSplit: s3a://bucket/2000000/part-00009-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97536205)  hosts=[localhost] ID=0000000040 position=null
      2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 4 : FileSourceSplit: s3a://bucket/2000000/part-00004-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97420601)  hosts=[localhost] ID=0000000035 position=null
      2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 8 : FileSourceSplit: s3a://bucket/2000000/part-00005-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97472256)  hosts=[localhost] ID=0000000036 position=null
      2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 3 : FileSourceSplit: s3a://bucket/2000000/part-00006-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97495880)  hosts=[localhost] ID=0000000037 position=null
      2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to subtask 0 : FileSourceSplit: s3a://bucket/2000000/part-00007-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97389425)  hosts=[localhost] ID=0000000038 position=null
      2022-12-27 13:38:47.158 StaticFileSplitEnumerator  - Assigned split to subtask 7 : FileSourceSplit: s3a://bucket/2000000/part-00008-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97428709)  hosts=[localhost] ID=0000000039 position=null
      

      Task Manager

      2246:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00007-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79887236)  hosts=[localhost] ID=0000000018 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226029]
      2247:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00000-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79987191)  hosts=[localhost] ID=0000000011 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226030]
      2248:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00009-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80247830)  hosts=[localhost] ID=0000000020 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226535]
      2249:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00004-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80055663)  hosts=[localhost] ID=0000000015 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226712]
      2250:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00005-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80022187)  hosts=[localhost] ID=0000000016 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226346]
      2251:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00006-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80109242)  hosts=[localhost] ID=0000000017 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=227284]
      2252:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00001-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79980911)  hosts=[localhost] ID=0000000012 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226429]
      2253:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00003-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79996693)  hosts=[localhost] ID=0000000014 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=227154]
      2254:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00002-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80040476)  hosts=[localhost] ID=0000000013 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=225920]
      2255:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00008-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79986997)  hosts=[localhost] ID=0000000019 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226278]
      2265:2022-12-27 13:38:47.115 FileSourceSplitReader  - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00006-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80109242)  hosts=[localhost] ID=0000000017 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=227284]]
      2266:2022-12-27 13:38:47.115 FileSourceSplitReader  - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00004-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80055663)  hosts=[localhost] ID=0000000015 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226712]]
      2267:2022-12-27 13:38:47.115 FileSourceSplitReader  - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00005-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80022187)  hosts=[localhost] ID=0000000016 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226346]]
      2268:2022-12-27 13:38:47.115 FileSourceSplitReader  - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00003-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79996693)  hosts=[localhost] ID=0000000014 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=227154]]
      2269:2022-12-27 13:38:47.115 FileSourceSplitReader  - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00009-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80247830)  hosts=[localhost] ID=0000000020 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226535]]
      2270:2022-12-27 13:38:47.115 FileSourceSplitReader  - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00002-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80040476)  hosts=[localhost] ID=0000000013 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=225920]]
      2271:2022-12-27 13:38:47.115 FileSourceSplitReader  - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00007-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79887236)  hosts=[localhost] ID=0000000018 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226029]]
      2272:2022-12-27 13:38:47.115 FileSourceSplitReader  - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00001-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79980911)  hosts=[localhost] ID=0000000012 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226429]]
      2273:2022-12-27 13:38:47.115 FileSourceSplitReader  - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00008-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79986997)  hosts=[localhost] ID=0000000019 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226278]]
      2275:2022-12-27 13:38:47.116 FileSourceSplitReader  - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00000-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79987191)  hosts=[localhost] ID=0000000011 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226030]]
      2281:2022-12-27 13:38:47.160 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00009-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97536205)  hosts=[localhost] ID=0000000040 position=null]
      2282:2022-12-27 13:38:47.160 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00001-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97489087)  hosts=[localhost] ID=0000000032 position=null]
      2283:2022-12-27 13:38:47.159 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00002-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97342071)  hosts=[localhost] ID=0000000033 position=null]
      2284:2022-12-27 13:38:47.160 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00000-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97377047)  hosts=[localhost] ID=0000000031 position=null]
      2285:2022-12-27 13:38:47.160 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00003-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97406878)  hosts=[localhost] ID=0000000034 position=null]
      2288:2022-12-27 13:38:47.161 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00005-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97472256)  hosts=[localhost] ID=0000000036 position=null]
      2289:2022-12-27 13:38:47.161 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00004-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97420601)  hosts=[localhost] ID=0000000035 position=null]
      2292:2022-12-27 13:38:47.162 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00006-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97495880)  hosts=[localhost] ID=0000000037 position=null]
      2293:2022-12-27 13:38:47.163 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00007-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97389425)  hosts=[localhost] ID=0000000038 position=null]
      2295:2022-12-27 13:38:47.163 SourceReaderBase  - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00008-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97428709)  hosts=[localhost] ID=0000000039 position=null]
      

      Same logs in github gist: https://gist.github.com/WonderBeat/ddfdc852556997b09451d48766b54183

      This can be fixed with a simple reordering in the HybridSourceReader#createReader. "reader.addSplits -> reader.start" sounds logical, wdyt?

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              WonderBeat Denis Golovachev
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated: