Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.10.0, 1.11.0
Description
When sink to Kafka using the Semantic.EXACTLY_ONCE mode.
The flink Kafka Connector Producer will auto set the transactional.id, and the user - defined value are ignored.
When the job operator name too long, will send failed
transactional.id is exceeds the kafka coordinator_key limit
The flink Kafka Connector policy for automatic generation of transaction.id is as follows
1. use the taskName + "-" + operatorUniqueID as transactional.id prefix (may be too long)
getRuntimeContext().getTaskName() + "-" + ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID()
2. Range of available transactional ids
[nextFreeTransactionalId, nextFreeTransactionalId + parallelism * kafkaProducersPoolSize)
The Kafka transaction.id check policy as follows:
string bytes.length can't larger than Short.MAX_VALUE (32767)
To reproduce this bug, the following conditions must be met:
- send msg to kafka with exactly once mode
- the task TaskName' length + TaskName's length is lager than the 32767 (A very long line of SQL or window statements can appear)
I suggest a solution:
1. Allows users to customize transactional.id 's prefix
or
2. Do md5 on the prefix before returning the real transactional.id
Attachments
Attachments
Issue Links
- is duplicated by
-
FLINK-20227 Kafka transaction IDs exceeding limit
- Closed
- relates to
-
FLINK-22452 Support specifying custom transactional.id prefix in FlinkKafkaProducer
- Closed
- links to