Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.15.3
-
None
-
None
Description
When I'm using the Kafka connector, and to set kafka's consumption boundary (" setBounded ") 。when my job runs normally (with no fail), the bounds are valid, and my job will finish. However, when my job fails and I restore it to the checkpoint used during the failure, I find that my job cannot be completed normally and is always running. However, I can see in the log that data has been consumed to the boundary set by me. I don't know if there is a problem with my usage, here is part of my code:
//代码占位符 String topicName = "jw-test-kafka-w-offset-002"; Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>(); offsets.put(new TopicPartition(topicName,0), 6L); KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers("xxx:9092") .setProperties(properties) .setTopics(topicName) .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .setBounded(OffsetsInitializer.offsets(offsets)) .build();