Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Duplicate
-
1.13.1
-
None
-
None
-
flink:flink-1.13.1
kafka: _2.12-2.5.0
java: 1.8.0_161
Description
when I test flink eos, and sink is kafka. first I click the button of cancel on flink web ui , then I input following code on console
bin/flink run -n -c com.shanjiancaofu.live.job.ChargeJob -s file:/soft/opt/checkpoint/072c0a72343c6e1f06b9bd37c5147cc0/chk-1/_metadata ./ad-live-process-0.11-jar-with-dependencies.jar
, after 10 second throw a exception
Caused by: org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
and my code is :
package com.shanjiancaofu.live.job; import com.alibaba.fastjson.JSON; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.SystemUtils; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper; import org.apache.flink.util.Collector; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.IsolationLevel; import java.util.*; @Slf4j public class ChargeJob1 { static class RecordScheme implements KafkaDeserializationSchema<ConsumerRecord<String, UserEvent>> { @Override public boolean isEndOfStream(ConsumerRecord<String, UserEvent> stringUserEventConsumerRecord) { return false; } @Override public ConsumerRecord<String, UserEvent> deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception { String key = null; UserEvent UserEvent = null; if (consumerRecord.key() != null) { key = new String(consumerRecord.key()); } if (consumerRecord.value() != null) { UserEvent = JSON.parseObject(new String(consumerRecord.value()), UserEvent.class); } return new ConsumerRecord<>( consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), consumerRecord.timestamp(), consumerRecord.timestampType(), consumerRecord.checksum(), consumerRecord.serializedKeySize(), consumerRecord.serializedValueSize(), key, UserEvent); } @Override public TypeInformation<ConsumerRecord<String, UserEvent>> getProducedType() { return TypeInformation.of(new TypeHint<ConsumerRecord<String, UserEvent>>() { }); } } public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); if (args != null) { // 传递全局参数 configuration.setString("args", String.join(" ", args)); } StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); env.setRestartStrategy(new RestartStrategies.FailureRateRestartStrategyConfiguration(1, Time.minutes(5), Time.seconds(10))); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //本地checkpoint配置 env.enableCheckpointing(1000 * 60L); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 每个检查点的间隔 checkpointConfig.setMinPauseBetweenCheckpoints(1000 * 5L); checkpointConfig.setCheckpointTimeout(1000 * 60L); // 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); if (SystemUtils.IS_OS_WINDOWS) { env.setStateBackend(new FsStateBackend("file:///soft/opt/checkpoint")); } else { env.setStateBackend(new FsStateBackend("file:///soft/opt/checkpoint")); } // 2. 读取数据 //kafka sink配置//kafka sink配置 Properties sinkProperties = new Properties(); sinkProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.17.81:9092,192.168.17.82:9092,192.168.17.83:9092"); sinkProperties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000 * 60 + ""); sinkProperties.setProperty(ProducerConfig.ACKS_CONFIG, "all"); sinkProperties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); sinkProperties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); sinkProperties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "user-event-processd-tr"); sinkProperties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.RoundRobinPartitioner"); FlinkKafkaProducer<String> stringFlinkKafkaProducer = new FlinkKafkaProducer<String>("dsp-user-event-processd", new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), sinkProperties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); Properties consumerProp = new Properties(); consumerProp.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.17.81:9092,192.168.17.82:9092,192.168.17.83:9092"); consumerProp.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "dsp-user-event-group"); consumerProp.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumerProp.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumerProp.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); consumerProp.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT)); consumerProp.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor"); env.addSource(new FlinkKafkaConsumer<>("dsp-user-event", new RecordScheme(), consumerProp)) .name("dsp-user-event-source") .keyBy(new KeySelector<ConsumerRecord<String, UserEvent>, Long>() { @Override public Long getKey(ConsumerRecord<String, UserEvent> value) throws Exception { return value.value().getUserId(); } }) .process(new ChargeProcess()).setParallelism(3) .map(obj -> obj) .addSink(stringFlinkKafkaProducer) .name("dsp-user-event-sink").uid("dsp-user-event-sink-uid"); env.execute("chargeJob"); } public static class ChargeProcess extends KeyedProcessFunction<Long, ConsumerRecord<String, UserEvent>, String> { ListState<String> listState = null; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); listState = getRuntimeContext().getListState(new ListStateDescriptor<String>("ad-ip", String.class)); } @Override public void close() throws Exception { super.close(); } @Override public void processElement(ConsumerRecord<String, UserEvent> stringUserEventConsumerRecord, Context context, Collector<String> collector) throws Exception { UserEvent value = stringUserEventConsumerRecord.value(); Iterable<String> strings = listState.get(); Iterator<String> iterator = strings.iterator(); List<String> objects = new ArrayList<>(); iterator.forEachRemaining(objects::add); objects.add(value.getUserId() + ""); try { // 幂等操作即可, self process boolean result = true; if (result) { listState.update(objects); collector.collect(JSON.toJSONString(value)); } log.info(Thread.currentThread().getId() + ": 处理:" + value.getEventMd5() + " " + listState.get().toString()); System.out.println(Thread.currentThread().getId() + ": 处理:" + value.getEventMd5() + " " + listState.get().toString()); } catch (Exception e) { System.out.println(e); } } } @Data @AllArgsConstructor @NoArgsConstructor public static class UserEvent { private Long userId; private String eventMd5; } }
Attachments
Issue Links
- duplicates
-
FLINK-23509 FlinkKafkaInternalProducer overrides static final ProducerIdAndEpoch#NONE during transaction recovery (fails)
- Resolved