Details
-
Bug
-
Status: Open
-
Minor
-
Resolution: Unresolved
-
1.16.0, 1.15.2, 1.15.3
-
None
Description
org.apache.flink.connector.base.source.hybrid.HybridSourceReader accumulates splits during recovery in
org.apache.flink.connector.base.source.hybrid.HybridSourceReader#restoredSplits.
As a next step it creates a reader and pushes all recoveredSplits to the reader.
org.apache.flink.connector.base.source.hybrid.HybridSourceReader#setCurrentReader
Instantiation sequence of the setCurrentReader is following
- reader.start()
- reader.addSplits()
Seems like it doesn't work if we use FileSourceReader as an underlying reader.
FileSourceReader#start() method checks if any splits are available to read and executes sendSplitRequest if empty. Current HybridSourceReader recovery sequence is not aligned with this.
So, every time we recover we immediately jump to the next splits.
Let me show you some logs. In this experiment job should have started with files inside the 1000000 bucket but jumped to the bucket number 2000000
Job Manager
2022-12-27 13:38:47.155 StaticFileSplitEnumerator - Assigned split to subtask 1 : FileSourceSplit: s3a://bucket/2000000/part-00001-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97489087) hosts=[localhost] ID=0000000032 position=null 2022-12-27 13:38:47.156 StaticFileSplitEnumerator - Assigned split to subtask 9 : FileSourceSplit: s3a://bucket/2000000/part-00002-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97342071) hosts=[localhost] ID=0000000033 position=null 2022-12-27 13:38:47.156 StaticFileSplitEnumerator - Assigned split to subtask 6 : FileSourceSplit: s3a://bucket/2000000/part-00000-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97377047) hosts=[localhost] ID=0000000031 position=null 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to subtask 5 : FileSourceSplit: s3a://bucket/2000000/part-00003-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97406878) hosts=[localhost] ID=0000000034 position=null 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to subtask 2 : FileSourceSplit: s3a://bucket/2000000/part-00009-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97536205) hosts=[localhost] ID=0000000040 position=null 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to subtask 4 : FileSourceSplit: s3a://bucket/2000000/part-00004-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97420601) hosts=[localhost] ID=0000000035 position=null 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to subtask 8 : FileSourceSplit: s3a://bucket/2000000/part-00005-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97472256) hosts=[localhost] ID=0000000036 position=null 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to subtask 3 : FileSourceSplit: s3a://bucket/2000000/part-00006-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97495880) hosts=[localhost] ID=0000000037 position=null 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to subtask 0 : FileSourceSplit: s3a://bucket/2000000/part-00007-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97389425) hosts=[localhost] ID=0000000038 position=null 2022-12-27 13:38:47.158 StaticFileSplitEnumerator - Assigned split to subtask 7 : FileSourceSplit: s3a://bucket/2000000/part-00008-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97428709) hosts=[localhost] ID=0000000039 position=null
Task Manager
2246:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00007-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79887236) hosts=[localhost] ID=0000000018 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226029] 2247:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00000-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79987191) hosts=[localhost] ID=0000000011 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226030] 2248:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00009-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80247830) hosts=[localhost] ID=0000000020 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226535] 2249:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00004-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80055663) hosts=[localhost] ID=0000000015 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226712] 2250:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00005-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80022187) hosts=[localhost] ID=0000000016 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226346] 2251:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00006-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80109242) hosts=[localhost] ID=0000000017 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=227284] 2252:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00001-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79980911) hosts=[localhost] ID=0000000012 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226429] 2253:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00003-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79996693) hosts=[localhost] ID=0000000014 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=227154] 2254:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00002-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80040476) hosts=[localhost] ID=0000000013 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=225920] 2255:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/1000000/part-00008-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79986997) hosts=[localhost] ID=0000000019 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226278] 2265:2022-12-27 13:38:47.115 FileSourceSplitReader - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00006-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80109242) hosts=[localhost] ID=0000000017 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=227284]] 2266:2022-12-27 13:38:47.115 FileSourceSplitReader - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00004-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80055663) hosts=[localhost] ID=0000000015 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226712]] 2267:2022-12-27 13:38:47.115 FileSourceSplitReader - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00005-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80022187) hosts=[localhost] ID=0000000016 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226346]] 2268:2022-12-27 13:38:47.115 FileSourceSplitReader - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00003-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79996693) hosts=[localhost] ID=0000000014 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=227154]] 2269:2022-12-27 13:38:47.115 FileSourceSplitReader - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00009-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80247830) hosts=[localhost] ID=0000000020 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226535]] 2270:2022-12-27 13:38:47.115 FileSourceSplitReader - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00002-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 80040476) hosts=[localhost] ID=0000000013 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=225920]] 2271:2022-12-27 13:38:47.115 FileSourceSplitReader - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00007-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79887236) hosts=[localhost] ID=0000000018 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226029]] 2272:2022-12-27 13:38:47.115 FileSourceSplitReader - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00001-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79980911) hosts=[localhost] ID=0000000012 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226429]] 2273:2022-12-27 13:38:47.115 FileSourceSplitReader - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00008-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79986997) hosts=[localhost] ID=0000000019 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226278]] 2275:2022-12-27 13:38:47.116 FileSourceSplitReader - Handling split change SplitAddition:[[FileSourceSplit: s3a://bucket/1000000/part-00000-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet [0, 79987191) hosts=[localhost] ID=0000000011 position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226030]] 2281:2022-12-27 13:38:47.160 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00009-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97536205) hosts=[localhost] ID=0000000040 position=null] 2282:2022-12-27 13:38:47.160 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00001-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97489087) hosts=[localhost] ID=0000000032 position=null] 2283:2022-12-27 13:38:47.159 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00002-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97342071) hosts=[localhost] ID=0000000033 position=null] 2284:2022-12-27 13:38:47.160 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00000-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97377047) hosts=[localhost] ID=0000000031 position=null] 2285:2022-12-27 13:38:47.160 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00003-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97406878) hosts=[localhost] ID=0000000034 position=null] 2288:2022-12-27 13:38:47.161 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00005-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97472256) hosts=[localhost] ID=0000000036 position=null] 2289:2022-12-27 13:38:47.161 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00004-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97420601) hosts=[localhost] ID=0000000035 position=null] 2292:2022-12-27 13:38:47.162 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00006-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97495880) hosts=[localhost] ID=0000000037 position=null] 2293:2022-12-27 13:38:47.163 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00007-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97389425) hosts=[localhost] ID=0000000038 position=null] 2295:2022-12-27 13:38:47.163 SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3a://bucket/2000000/part-00008-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet [0, 97428709) hosts=[localhost] ID=0000000039 position=null]
Same logs in github gist: https://gist.github.com/WonderBeat/ddfdc852556997b09451d48766b54183
This can be fixed with a simple reordering in the HybridSourceReader#createReader. "reader.addSplits -> reader.start" sounds logical, wdyt?
Attachments
Issue Links
- links to