Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
1.14.0
Description
The KafkaSink throws the exception when restarted with a lower parallelism and the exactly-once guarantee. The exception is like this.
java.lang.IllegalStateException: Internal error: It is expected that state from previous executions is distributed to the same subtask id. at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) at org.apache.flink.connector.kafka.sink.KafkaWriter.recoverAndInitializeState(KafkaWriter.java:178) at org.apache.flink.connector.kafka.sink.KafkaWriter.<init>(KafkaWriter.java:130) at org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:99) at org.apache.flink.streaming.runtime.operators.sink.SinkOperator.initializeState(SinkOperator.java:134) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:690) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:666) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:785) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:638) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:572) at java.lang.Thread.run(Thread.java:748) Suppressed: java.lang.NullPointerException at org.apache.flink.streaming.runtime.operators.sink.SinkOperator.close(SinkOperator.java:195) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:141) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:127) at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1028) at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:1014) at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:927) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:797) ... 4 more
I start the kafka cluster(kafka_2.13-2.8.0) and the flink cluster in my own mac. I change the parallelism from 4 to 2 and restart the job from some completed checkpoint. Then the error occurs.
And the cli command and the code are as follows.
// cli command
./bin/flink run -d -c com.test.KafkaExactlyOnceScaleDownTest -s /Users/test/checkpointDir/ExactlyOnceTest1/67105fcc1724e147fc6208af0dd90618/chk-1 /Users/test/project/self/target/test.jar
public class KafkaExactlyOnceScaleDownTest { public static void main(String[] args) throws Exception { final String kafkaSourceTopic = "flinkSourceTest"; final String kafkaSinkTopic = "flinkSinkExactlyTest1"; final String groupId = "ExactlyOnceTest1"; final String brokers = "localhost:9092"; final String ckDir = "file:///Users/test/checkpointDir/" + groupId; final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().setCheckpointStorage(ckDir); env.setParallelism(4); KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics(kafkaSourceTopic) .setGroupId(groupId) .setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); DataStream<String> flintstones = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); DataStream<String> adults = flintstones.filter(s -> s != null && s.length() > 2); Properties props = new Properties(); props.setProperty("transaction.timeout.ms", "900000"); adults.sinkTo(KafkaSink.<String>builder() .setBootstrapServers(brokers) .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) .setTransactionalIdPrefix("tp-test-") .setKafkaProducerConfig(props) .setRecordSerializer(new SelfSerializationSchema(kafkaSinkTopic, new SimpleStringSchema())) .build()); env.execute("ScaleDownTest"); } static class SelfSerializationSchema implements KafkaRecordSerializationSchema<String> { private final SerializationSchema<String> valueSerialization; private String topic; SelfSerializationSchema(String topic, SerializationSchema<String> valueSerialization){ this.valueSerialization = valueSerialization; this.topic = topic; } @Override public void open(SerializationSchema.InitializationContext context, KafkaSinkContext sinkContext) throws Exception { KafkaRecordSerializationSchema.super.open(context, sinkContext); } @Override public ProducerRecord<byte[], byte[]> serialize(String s, KafkaSinkContext kafkaSinkContext, Long aLong) { final byte[] valueSerialized = valueSerialization.serialize(s); return new ProducerRecord<>(topic, valueSerialized); } } }
Attachments
Issue Links
- is related to
-
FLINK-27787 New tasks think they've been restored from savepoint (even when they weren't present in that savepoint)
- Open
-
FLINK-23896 The new KafkaSink drops data if job fails between checkpoint and transaction commit.
- Resolved
- links to