Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-25126

FlinkKafkaInternalProducer state is not reset if transaction finalization fails

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 1.14.0, 1.15.0
    • 1.14.3, 1.15.0
    • Connectors / Kafka

    Description

      flinksql task submitted by sql client will failed,

      this is the sql :

      SET 'execution.runtime-mode' = 'batch';

       CREATE TABLE ka15 (
           name String,
           cnt bigint
       ) WITH (
         'connector' = 'kafka',
         'topic' = 'shifang8888',
        'properties.bootstrap.servers' = 'flinkx1:9092',
        'properties.transaction.timeout.ms' = '800000',
        'properties.max.block.ms' = '300000',
         'value.format' = 'json',
         'sink.parallelism' = '2',
         'sink.delivery-guarantee' = 'exactly-once',
         'sink.transactional-id-prefix' = 'dtstack9999');

       

      insert into ka15 SELECT
        name,
        cnt
      FROM
        (VALUES ('Bob',100), ('Alice',100), ('Greg',100), ('Bob',100)) AS NameTable(name,cnt);

       

      this is the error:

      Caused by: java.lang.IllegalStateException
      at org.apache.flink.util.Preconditions.checkState(Preconditions.java:177)
      at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.setTransactionId(FlinkKafkaInternalProducer.java:164)
      at org.apache.flink.connector.kafka.sink.KafkaCommitter.getRecoveryProducer(KafkaCommitter.java:144)
      at org.apache.flink.connector.kafka.sink.KafkaCommitter.lambda$commit$0(KafkaCommitter.java:76)
      at java.util.Optional.orElseGet(Optional.java:267)
      at org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:76)
      ... 14 more
       
       
      i found the reason why  kafka commit failed, when downstream operator CommitterOperator was commiting transaction, the upstream  operator SinkOperator has closed , it will abort the transaction which  is committing by CommitterOperator, this is occurs when execution.runtime-mode is batch

      Attachments

        Activity

          People

            fpaul Fabian Paul
            tonyboo9527 eric yu
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: