Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-27963

FlinkRuntimeException in KafkaSink causes a Flink job to hang

    XMLWordPrintableJSON

Details

    Description

      If FlinkRuntimeException occurs in the KafkaSink then the Flink job tries to re-send failed data  again and gets into endless loop "exception->send again"

      Code sample which throws the FlinkRuntimeException:

      int numberOfRows = 1;
          int rowsPerSecond = 1;
      
          DataStream<String> stream = environment.addSource(
                          new DataGeneratorSource<>(
                                  RandomGenerator.stringGenerator(1050000), // max.message.bytes=1048588
                                  rowsPerSecond,
                                  (long) numberOfRows),
                          TypeInformation.of(String.class))
                  .setParallelism(1)
                  .name("string-generator");
      
      
          KafkaSinkBuilder<String> builder = KafkaSink.<String>builder()
                  .setBootstrapServers("localhost:9092")
                  .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                  .setRecordSerializer(
                          KafkaRecordSerializationSchema.builder().setTopic("test.output")
                                  .setValueSerializationSchema(new SimpleStringSchema())
                                  .build());
      
      
          KafkaSink<String> sink = builder.build();
      
          stream.sinkTo(sink).setParallelism(1).name("output-producer"); 

      Exception Stack Trace:

      2022-06-02/14:01:45.066/PDT [flink-akka.actor.default-dispatcher-4] INFO output-producer: Writer -> output-producer: Committer (1/1) (a66beca5a05c1c27691f7b94ca6ac025) switched from RUNNING to FAILED on 271b1b90-7d6b-4a34-8116-3de6faa8a9bf @ 127.0.0.1 (dataPort=-1). org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka null with FlinkKafkaInternalProducer{transactionalId='null', inTransaction=false, closed=false} at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:440) ~[flink-connector-kafka-1.15.0.jar:1.15.0] at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:421) ~[flink-connector-kafka-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) ~[flink-runtime-1.15.0.jar:1.15.0] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) ~[flink-runtime-1.15.0.jar:1.15.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) ~[flink-runtime-1.15.0.jar:1.15.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-runtime-1.15.0.jar:1.15.0] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292] Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1050088 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration. 

      **

      Attachments

        Activity

          People

            Unassigned Unassigned
            igaevd Dmytro
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated: