Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
1.15.1
Description
Table API S3 streaming sink (CSV format) throws the following exception,
Caused by: org.apache.flink.util.SerializedThrowable: S3RecoverableFsDataOutputStream cannot sync state to S3. Use persist() to create a persistent recoverable intermediate point. at org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream.sync(RefCountedBufferingFileStream.java:111) ~[flink-s3-fs-hadoop-1.15.1.jar:1.15.1] at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.sync(S3RecoverableFsDataOutputStream.java:129) ~[flink-s3-fs-hadoop-1.15.1.jar:1.15.1] at org.apache.flink.formats.csv.CsvBulkWriter.finish(CsvBulkWriter.java:110) ~[flink-csv-1.15.1.jar:1.15.1] at org.apache.flink.connector.file.table.FileSystemTableSink$ProjectionBulkFactory$1.finish(FileSystemTableSink.java:642) ~[flink-connector-files-1.15.1.jar:1.15.1] at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:64) ~[flink-file-sink-common-1.15.1.jar:1.15.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:263) ~[flink-streaming-java-1.15.1.jar:1.15.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.prepareBucketForCheckpointing(Bucket.java:305) ~[flink-streaming-java-1.15.1.jar:1.15.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onReceptionOfCheckpoint(Bucket.java:277) ~[flink-streaming-java-1.15.1.jar:1.15.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotActiveBuckets(Buckets.java:270) ~[flink-streaming-java-1.15.1.jar:1.15.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotState(Buckets.java:261) ~[flink-streaming-java-1.15.1.jar:1.15.1] at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.snapshotState(StreamingFileSinkHelper.java:87) ~[flink-streaming-java-1.15.1.jar:1.15.1] at org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.snapshotState(AbstractStreamingWriter.java:129) ~[flink-connector-files-1.15.1.jar:1.15.1]
In my table config, I am trying to read from Kafka and write to S3 (s3a) using table API and checkpoint configuration using s3p (presto). Even I tried with a simple datagen example instead of Kafka with local file system as checkpointing (`file:///` instead of `s3p://`) and I am getting the same issue. Exactly it is fails when the code triggers the checkpoint.
Some related slack conversation and SO conversation here, here and here
Since there is no work around for S3 table API streaming sink, I am marking this as critical. if this is not a relevant severity, feel free to reduce the priority.
Attachments
Issue Links
- causes
-
FLINK-33536 Flink Table API CSV streaming sink fails with "IOException: Stream closed"
- Open
- links to