Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
2.0.0, 1.18.2, 1.20.0, 1.19.1
Description
FLINK-34063 has fixed an important issue with compacted state but introduced super slow state recovery for both non-compacted and compacted list states from S3.
Short statement: ~6Mb list state generated from
org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinator
restore time is ~62 hours.
Detailed analysis:
During file sink compaction CompactCoordinator with parallelism 1 is collecting the file list which needs to be compacted (and writes them into the state). In the problematic scenario the list list size was ~15k entries.
OperatorStateRestoreOperation.deserializeOperatorStateValues gets an offset for each and every list entry and does basically the following:
for (long offset : offsets) { in.seek(offset); stateListForName.add(serializer.deserialize(div)); }
CompressibleFSDataInputStream.seek has introduced the following code:
final int available = compressingDelegate.available(); if (available > 0) { if (available != compressingDelegate.skip(available)) { throw new IOException("Unable to skip buffered data."); } }
There are 2 problems with the mentioned code part:
- The skip operation is not needed for uncompressed state
- skip takes ~15 seconds for ~6Mb in case of S3 (which ends up in ~62 hours restore time)
We've already addressed the first issue with a simple if condition but the second is definitely a harder one. Until the latter is not resolved I would say that compressed state is not a good choice together with S3 and list restoral.
Steps to reproduce:
- Create a list operator state with several thousand entries
- Put it to S3
- Try to restore it from Flink