Details
-
Bug
-
Status: Open
-
Not a Priority
-
Resolution: Unresolved
-
1.14.4, 1.15.0
-
None
Description
If FlinkRuntimeException occurs in the KafkaSink then the Flink job tries to re-send failed data again and gets into endless loop "exception->send again"
Code sample which throws the FlinkRuntimeException:
int numberOfRows = 1; int rowsPerSecond = 1; DataStream<String> stream = environment.addSource( new DataGeneratorSource<>( RandomGenerator.stringGenerator(1050000), // max.message.bytes=1048588 rowsPerSecond, (long) numberOfRows), TypeInformation.of(String.class)) .setParallelism(1) .name("string-generator"); KafkaSinkBuilder<String> builder = KafkaSink.<String>builder() .setBootstrapServers("localhost:9092") .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .setRecordSerializer( KafkaRecordSerializationSchema.builder().setTopic("test.output") .setValueSerializationSchema(new SimpleStringSchema()) .build()); KafkaSink<String> sink = builder.build(); stream.sinkTo(sink).setParallelism(1).name("output-producer");
Exception Stack Trace:
2022-06-02/14:01:45.066/PDT [flink-akka.actor.default-dispatcher-4] INFO output-producer: Writer -> output-producer: Committer (1/1) (a66beca5a05c1c27691f7b94ca6ac025) switched from RUNNING to FAILED on 271b1b90-7d6b-4a34-8116-3de6faa8a9bf @ 127.0.0.1 (dataPort=-1). org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka null with FlinkKafkaInternalProducer{transactionalId='null', inTransaction=false, closed=false} at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:440) ~[flink-connector-kafka-1.15.0.jar:1.15.0] at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:421) ~[flink-connector-kafka-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) ~[flink-runtime-1.15.0.jar:1.15.0] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) ~[flink-runtime-1.15.0.jar:1.15.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) ~[flink-runtime-1.15.0.jar:1.15.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-runtime-1.15.0.jar:1.15.0] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292] Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1050088 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
**