Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-23674

flink restart with checkpoint ,kafka producer throw exception

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 1.13.1
    • None
    • Connectors / Kafka
    • None
    • flink:flink-1.13.1

      kafka: _2.12-2.5.0

      java:  1.8.0_161

    Description

       

      when I test flink  eos,  and sink is kafka. first I click the button of cancel on flink web ui , then I input  following code on console

      bin/flink run -n -c com.shanjiancaofu.live.job.ChargeJob -s file:/soft/opt/checkpoint/072c0a72343c6e1f06b9bd37c5147cc0/chk-1/_metadata ./ad-live-process-0.11-jar-with-dependencies.jar
      

      , after 10 second throw a exception

      Caused by: org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
      

      and my code is :

      package com.shanjiancaofu.live.job;
      
      
      import com.alibaba.fastjson.JSON;
      import lombok.AllArgsConstructor;
      import lombok.Data;
      import lombok.NoArgsConstructor;
      import lombok.extern.slf4j.Slf4j;
      import org.apache.commons.lang.SystemUtils;
      import org.apache.flink.api.common.restartstrategy.RestartStrategies;
      import org.apache.flink.api.common.serialization.SimpleStringSchema;
      import org.apache.flink.api.common.state.ListState;
      import org.apache.flink.api.common.state.ListStateDescriptor;
      import org.apache.flink.api.common.time.Time;
      import org.apache.flink.api.common.typeinfo.TypeHint;
      import org.apache.flink.api.common.typeinfo.TypeInformation;
      import org.apache.flink.api.java.functions.KeySelector;
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.runtime.state.filesystem.FsStateBackend;
      import org.apache.flink.streaming.api.CheckpointingMode;
      import org.apache.flink.streaming.api.TimeCharacteristic;
      import org.apache.flink.streaming.api.environment.CheckpointConfig;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
      import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
      import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
      import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
      import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
      import org.apache.flink.util.Collector;
      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.producer.ProducerConfig;
      import org.apache.kafka.common.IsolationLevel;
      
      import java.util.*;
      
      @Slf4j
      public class ChargeJob1 {
      
         static class RecordScheme implements KafkaDeserializationSchema<ConsumerRecord<String, UserEvent>> {
      
            @Override
            public boolean isEndOfStream(ConsumerRecord<String, UserEvent> stringUserEventConsumerRecord) {
               return false;
            }
      
            @Override
            public ConsumerRecord<String, UserEvent> deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
               String key = null;
               UserEvent UserEvent = null;
               if (consumerRecord.key() != null) {
                  key = new String(consumerRecord.key());
               }
               if (consumerRecord.value() != null) {
                  UserEvent = JSON.parseObject(new String(consumerRecord.value()), UserEvent.class);
               }
               return new ConsumerRecord<>(
                  consumerRecord.topic(),
                  consumerRecord.partition(),
                  consumerRecord.offset(),
                  consumerRecord.timestamp(),
                  consumerRecord.timestampType(),
                  consumerRecord.checksum(),
                  consumerRecord.serializedKeySize(),
                  consumerRecord.serializedValueSize(),
                  key, UserEvent);
            }
      
            @Override
            public TypeInformation<ConsumerRecord<String, UserEvent>> getProducedType() {
               return TypeInformation.of(new TypeHint<ConsumerRecord<String, UserEvent>>() {
               });
            }
         }
      
      
         public static void main(String[] args) throws Exception {
            Configuration configuration = new Configuration();
      
            if (args != null) {
               // 传递全局参数
               configuration.setString("args", String.join(" ", args));
            }
      
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
            env.setParallelism(3);
            env.setRestartStrategy(new RestartStrategies.FailureRateRestartStrategyConfiguration(1, Time.minutes(5), Time.seconds(10)));
            env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
            //本地checkpoint配置
            env.enableCheckpointing(1000 * 60L);
      
            CheckpointConfig checkpointConfig = env.getCheckpointConfig();
            checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
            // 每个检查点的间隔
            checkpointConfig.setMinPauseBetweenCheckpoints(1000 * 5L);
            checkpointConfig.setCheckpointTimeout(1000 * 60L);
            // 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
            checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
      
      
            if (SystemUtils.IS_OS_WINDOWS) {
               env.setStateBackend(new FsStateBackend("file:///soft/opt/checkpoint"));
            } else {
               env.setStateBackend(new FsStateBackend("file:///soft/opt/checkpoint"));
            }
            // 2. 读取数据
            //kafka sink配置//kafka sink配置
      
            Properties sinkProperties = new Properties();
            sinkProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.17.81:9092,192.168.17.82:9092,192.168.17.83:9092");
      
            sinkProperties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000 * 60 + "");
            sinkProperties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
            sinkProperties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
            sinkProperties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
      
            sinkProperties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "user-event-processd-tr");
            sinkProperties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.RoundRobinPartitioner");
      
      
            FlinkKafkaProducer<String> stringFlinkKafkaProducer = new FlinkKafkaProducer<String>("dsp-user-event-processd",
               new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), sinkProperties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
      
            Properties consumerProp = new Properties();
            consumerProp.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.17.81:9092,192.168.17.82:9092,192.168.17.83:9092");
            consumerProp.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "dsp-user-event-group");
            consumerProp.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            consumerProp.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            consumerProp.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
            consumerProp.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
            consumerProp.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
      
            env.addSource(new FlinkKafkaConsumer<>("dsp-user-event", new RecordScheme(), consumerProp))
               .name("dsp-user-event-source")
      
               .keyBy(new KeySelector<ConsumerRecord<String, UserEvent>, Long>() {
                  @Override
                  public Long getKey(ConsumerRecord<String, UserEvent> value) throws Exception {
                     return value.value().getUserId();
                  }
               })
      
               .process(new ChargeProcess()).setParallelism(3)
               .map(obj -> obj)
               .addSink(stringFlinkKafkaProducer)
               .name("dsp-user-event-sink").uid("dsp-user-event-sink-uid");
            env.execute("chargeJob");
         }
      
         public static class ChargeProcess extends KeyedProcessFunction<Long, ConsumerRecord<String, UserEvent>, String> {
      
            ListState<String> listState = null;
      
            @Override
            public void open(Configuration parameters) throws Exception {
               super.open(parameters);
      
               listState = getRuntimeContext().getListState(new ListStateDescriptor<String>("ad-ip", String.class));
      
            }
      
      
            @Override
            public void close() throws Exception {
               super.close();
            }
      
            @Override
            public void processElement(ConsumerRecord<String, UserEvent> stringUserEventConsumerRecord,
                                 Context context,
                                 Collector<String> collector) throws Exception {
               UserEvent value = stringUserEventConsumerRecord.value();
               Iterable<String> strings = listState.get();
               Iterator<String> iterator = strings.iterator();
               List<String> objects = new ArrayList<>();
               iterator.forEachRemaining(objects::add);
               objects.add(value.getUserId() + "");
               try {
                  // 幂等操作即可, self process
                  boolean result = true;
      
                  if (result) {
                     listState.update(objects);
                     collector.collect(JSON.toJSONString(value));
                  }
                  log.info(Thread.currentThread().getId() + ": 处理:" + value.getEventMd5() + " " + listState.get().toString());
                  System.out.println(Thread.currentThread().getId() + ": 处理:" + value.getEventMd5() + " " + listState.get().toString());
               } catch (Exception e) {
                  System.out.println(e);
               }
            }
         }
      
         @Data
         @AllArgsConstructor
         @NoArgsConstructor
         public static class UserEvent {
            private Long userId;
            private String eventMd5;
      
         }
      }
      
      

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              s32967326@163.com meetsong
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: