Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-23854

KafkaSink error when restart from the checkpoint with a lower parallelism by exactly-once guarantee

    XMLWordPrintableJSON

Details

    Description

      The KafkaSink throws the exception when restarted with a lower parallelism and the exactly-once guarantee. The exception is like this.

      java.lang.IllegalStateException: Internal error: It is expected that state from previous executions is distributed to the same subtask id.   
      at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)   
      at org.apache.flink.connector.kafka.sink.KafkaWriter.recoverAndInitializeState(KafkaWriter.java:178)   
      at org.apache.flink.connector.kafka.sink.KafkaWriter.<init>(KafkaWriter.java:130)   
      at org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:99)   
      at org.apache.flink.streaming.runtime.operators.sink.SinkOperator.initializeState(SinkOperator.java:134)   
      at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)   
      at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)   
      at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109)   
      at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:690)   
      at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)   
      at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:666)   
      at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:785)   
      at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:638)   
      at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)   
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:572)   
      at java.lang.Thread.run(Thread.java:748)    
      Suppressed: java.lang.NullPointerException       
      at org.apache.flink.streaming.runtime.operators.sink.SinkOperator.close(SinkOperator.java:195)       
      at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:141)       
      at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:127)       
      at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1028)       
      at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:1014)       
      at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:927)       
      at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:797)        ... 4 more
      

      I start the kafka cluster(kafka_2.13-2.8.0) and the flink cluster in my own mac. I change the parallelism from 4 to 2 and restart the job from some completed checkpoint. Then the error occurs. 

      And the cli command and the code are as follows.

      // cli command
      ./bin/flink run -d -c com.test.KafkaExactlyOnceScaleDownTest -s /Users/test/checkpointDir/ExactlyOnceTest1/67105fcc1724e147fc6208af0dd90618/chk-1 /Users/test/project/self/target/test.jar
      
      public class KafkaExactlyOnceScaleDownTest { 
      public static void main(String[] args) throws Exception { 
          final String kafkaSourceTopic = "flinkSourceTest"; 
          final String kafkaSinkTopic = "flinkSinkExactlyTest1"; 
          final String groupId = "ExactlyOnceTest1"; 
          final String brokers = "localhost:9092"; 
          final String ckDir = "file:///Users/test/checkpointDir/" + groupId; 
          final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
          env.enableCheckpointing(60000); 
          env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);        
      env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
          env.getCheckpointConfig().setCheckpointStorage(ckDir); 
          env.setParallelism(4); 
      
          KafkaSource<String> source = KafkaSource.<String>builder() 
           .setBootstrapServers(brokers) 
           .setTopics(kafkaSourceTopic) 
           .setGroupId(groupId) 
           .setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 
           .setValueOnlyDeserializer(new SimpleStringSchema()) 
           .build(); 
      
          DataStream<String> flintstones = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); 
          DataStream<String> adults = flintstones.filter(s -> s != null && s.length() > 2); 
          Properties props = new Properties(); 
          props.setProperty("transaction.timeout.ms", "900000"); 
          adults.sinkTo(KafkaSink.<String>builder() 
          .setBootstrapServers(brokers) 
          .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) 
          .setTransactionalIdPrefix("tp-test-") 
          .setKafkaProducerConfig(props) 
          .setRecordSerializer(new SelfSerializationSchema(kafkaSinkTopic, new SimpleStringSchema())) 
          .build()); 
      
          env.execute("ScaleDownTest"); 
      } 
      
      static class SelfSerializationSchema implements KafkaRecordSerializationSchema<String> { private final SerializationSchema<String> valueSerialization; private String topic; SelfSerializationSchema(String topic, SerializationSchema<String> valueSerialization){ this.valueSerialization = valueSerialization; this.topic = topic; } @Override public void open(SerializationSchema.InitializationContext context, KafkaSinkContext sinkContext) throws Exception { KafkaRecordSerializationSchema.super.open(context, sinkContext); } @Override public ProducerRecord<byte[], byte[]> serialize(String s, KafkaSinkContext kafkaSinkContext, Long aLong) { final byte[] valueSerialized = valueSerialization.serialize(s); return new ProducerRecord<>(topic, valueSerialized); } } 
      }
      

      Attachments

        Issue Links

          Activity

            People

              arvid Arvid Heise
              ruanhang1993 Hang Ruan
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: