Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Done
-
1.14.0
-
None
Description
We experienced a RuntimeException in a test run for FLINK-23850 :
java.lang.RuntimeException: Failed to send data to Kafka: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception. at org.apache.flink.connector.kafka.sink.KafkaWriter.checkErroneous(KafkaWriter.java:263) ~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:178) ~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:161) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at StreamExecCalc$6.processElement(Unknown Source) ~[?:?] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:36) ~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:27) ~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:141) ~[flink-table_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:341) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:490) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:789) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:741) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265] Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception.
Test job executed:
Configuration config = new Configuration(); config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(20, 2000)); env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); env.setParallelism(6); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.createTable("T1", TableDescriptor.forConnector("kafka") .schema(Schema.newBuilder() .column("pk", DataTypes.STRING().notNull()) .column("x", DataTypes.STRING().notNull()) .build()) .option("topic", "flink-23850-in1") .option("properties.bootstrap.servers", FLINK23850Utils.BOOTSTRAP_SERVERS) .option("value.format", "csv") .option("scan.startup.mode", "earliest-offset") .build()); final Table resultTable = tableEnv.sqlQuery( "SELECT " + "T1.pk, " + "'asd', " + "'foo', " + "'bar' " + "FROM T1"); tableEnv.createTable("T4", TableDescriptor.forConnector("kafka") .schema(Schema.newBuilder() .column("pk", DataTypes.STRING().notNull()) .column("some_calculated_value", DataTypes.STRING()) .column("pk1", DataTypes.STRING()) .column("pk2", DataTypes.STRING()) .build()) .option("topic", "flink-23850-out") .option("properties.bootstrap.servers", FLINK23850Utils.BOOTSTRAP_SERVERS) .option("value.format", "csv") .option("sink.delivery-guarantee", "exactly-once") .option("sink.transactional-id-prefix", "flink-23850") .option("scan.startup.mode", "earliest-offset") .build()); resultTable.executeInsert("T4");
Attachments
Attachments
Issue Links
- Discovered while testing
-
FLINK-23850 Test Kafka table connector with new runtime provider
- Closed
- is superceded by
-
FLINK-24131 KafkaSinkITCase "Detected producer leak"
- Resolved
This is a Kafka broker issue, probably the root cause of
KAFKA-9310. From what I can see, it some kind of concurrency issue where the broker is just not able to write to a transaction right after another transaction was committed.We could reliably reproduce the issue with Kafka broker running on 2.4.1. But where unable to see the issue on 2.5.X or 2.7.X.
The solution is similar to
KAFKA-9310: just restart. However, Flink users probably experience longer downtime on average because of larger applications and state. Hence, we will give directly feedback to upgrade Kafka to 2.5+.This will be solved as part of
FLINK-24131.