Details
-
Bug
-
Status: Closed
-
Minor
-
Resolution: Workaround
-
1.7.1
Description
We run multiple jobs on a cluster which write a lot to the same Kafka topic from identically named sinks. When EXACTLY_ONCE semantic is enabled for the KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go into a restart cycle.
Example exception from the Kafka log:
[2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing append operation on partition finding-commands-dev-1-0 (kafka.server.ReplicaManager) org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is no longer valid. There is probably another producer with a newer epoch. 483 (request epoch), 484 (server epoch)
The reason for this is the way FlinkKafkaProducer initializes the TransactionalIdsGenerator:
The IDs are only guaranteed to be unique for a single Job. But they can clash between different Jobs (and Clusters).
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java @@ -819,6 +819,7 @@ public class FlinkKafkaProducer<IN> nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState( NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); transactionalIdsGenerator = new TransactionalIdsGenerator( + // the prefix probably should include job id and maybe cluster id getRuntimeContext().getTaskName() + "-" + ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(),
Attachments
Issue Links
- duplicates
-
FLINK-22785 Kafka transaction failing when there are two producers
- Open
- is related to
-
FLINK-22452 Support specifying custom transactional.id prefix in FlinkKafkaProducer
- Closed
- relates to
-
FLINK-22452 Support specifying custom transactional.id prefix in FlinkKafkaProducer
- Closed