Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
1.14.0, 1.15.0
-
SET 'execution.runtime-mode' = 'batch';
CREATE TABLE ka15 (
name String,
cnt bigint
) WITH (
'connector' = 'kafka',
'topic' = 'shifang8888',
'properties.bootstrap.servers' = 'flinkx1:9092',
'properties.transaction.timeout.ms' = '800000',
'properties.max.block.ms' = '300000',
'value.format' = 'json',
'sink.parallelism' = '2',
'sink.delivery-guarantee' = 'exactly-once',
'sink.transactional-id-prefix' = 'dtstack9999');insert into ka15 SELECT
name,
cnt
FROM
(VALUES ('Bob',100), ('Alice',100), ('Greg',100), ('Bob',100)) AS NameTable(name,cnt);SET 'execution.runtime-mode' = 'batch'; CREATE TABLE ka15 ( name String, cnt bigint ) WITH ( 'connector' = 'kafka', 'topic' = 'shifang8888', 'properties.bootstrap.servers' = 'flinkx1:9092', 'properties.transaction.timeout.ms' = '800000', 'properties.max.block.ms' = '300000', 'value.format' = 'json', 'sink.parallelism' = '2', 'sink.delivery-guarantee' = 'exactly-once', 'sink.transactional-id-prefix' = 'dtstack9999'); insert into ka15 SELECT name, cnt FROM (VALUES ('Bob',100), ('Alice',100), ('Greg',100), ('Bob',100)) AS NameTable(name,cnt);
Description
flinksql task submitted by sql client will failed,
this is the sql :
SET 'execution.runtime-mode' = 'batch';
CREATE TABLE ka15 (
name String,
cnt bigint
) WITH (
'connector' = 'kafka',
'topic' = 'shifang8888',
'properties.bootstrap.servers' = 'flinkx1:9092',
'properties.transaction.timeout.ms' = '800000',
'properties.max.block.ms' = '300000',
'value.format' = 'json',
'sink.parallelism' = '2',
'sink.delivery-guarantee' = 'exactly-once',
'sink.transactional-id-prefix' = 'dtstack9999');
insert into ka15 SELECT
name,
cnt
FROM
(VALUES ('Bob',100), ('Alice',100), ('Greg',100), ('Bob',100)) AS NameTable(name,cnt);
this is the error:
Caused by: java.lang.IllegalStateException
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:177)
at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.setTransactionId(FlinkKafkaInternalProducer.java:164)
at org.apache.flink.connector.kafka.sink.KafkaCommitter.getRecoveryProducer(KafkaCommitter.java:144)
at org.apache.flink.connector.kafka.sink.KafkaCommitter.lambda$commit$0(KafkaCommitter.java:76)
at java.util.Optional.orElseGet(Optional.java:267)
at org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:76)
... 14 more
i found the reason why kafka commit failed, when downstream operator CommitterOperator was commiting transaction, the upstream operator SinkOperator has closed , it will abort the transaction which is committing by CommitterOperator, this is occurs when execution.runtime-mode is batch