Details
-
Bug
-
Status: Open
-
P3
-
Resolution: Unresolved
-
None
-
None
Description
I use KafkaIO as a source, and I would like consumed offsets to be stored in Kafka (in the __consumer_offsets topic).
I'm configuring the Kafka reader with
.updateConsumerProperties(ImmutableMap.of( ConsumerConfig.GROUP_ID_CONFIG, "my-group", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, java.lang.Boolean.TRUE, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10" // doesn't work with default value either (5000ms) ))
But the offsets are not stored in Kafka (nothing in __consumer_offsets, next job will restart at latest offset).
I can't find in the code where the offsets are supposed to be committed.
I tried to add a manual commit in the consumerPollLoop() method, and it works, offsets are committed:
private void consumerPollLoop() { // Read in a loop and enqueue the batch of records, if any, to availableRecordsQueue while (!closed.get()) { try { ConsumerRecords<byte[], byte[]> records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis()); if (!records.isEmpty() && !closed.get()) { availableRecordsQueue.put(records); // blocks until dequeued. // Manual commit consumer.commitSync(); } } catch (InterruptedException e) { LOG.warn("{}: consumer thread is interrupted", this, e); // not expected break; } catch (WakeupException e) { break; } } LOG.info("{}: Returning from consumer pool loop", this); }
Is this a bug in KafkaIO or am I misconfiguring something?
Disclamer: I'm currently using KafkaIO in Dataflow, using the backport in Dataflow SDK (https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java), but I'm confident the code is similar for this case.
Edit: I found the correct method where KafkaIO is supposed to commit at the end of a batch. I'm currently testing it and will be able to open a pull request soon:
// KafkaCheckpointMark.java /** * Optional consumer that will be used to commit offsets into Kafka when finalizeCheckpoint() is called */ @Nullable private final Consumer consumer; public KafkaCheckpointMark(List<PartitionMark> partitions, @Nullable Consumer consumer) { this.partitions = partitions; this.consumer = consumer; } /** * Commit synchronously into Kafka offsets that have been passed downstream. */ @Override public void finalizeCheckpoint() throws IOException { if (consumer == null) { LOG.warn("finalizeCheckpoint(): no consumer provided, will not commit anything."); return; } if (partitions.size() == 0) { LOG.info("finalizeCheckpoint(): nothing to commit to Kafka."); return; } final Map<TopicPartition, OffsetAndMetadata> offsets = newHashMap(); String committedOffsets = ""; for (PartitionMark partition : partitions) { TopicPartition topicPartition = partition.getTopicPartition(); offsets.put(topicPartition, new OffsetAndMetadata(partition.offset)); committedOffsets += topicPartition.topic() + "-" + topicPartition.partition() + ":" + partition.offset + ","; } final String printableOffsets = committedOffsets.substring(0, committedOffsets.length() - 1); try { consumer.commitSync(offsets); LOG.info("finalizeCheckpoint(): committed Kafka offsets {}", printableOffsets); } catch (Exception e) { LOG.error("finalizeCheckpoint(): {} when trying to commit Kafka offsets [{}]", e.getClass().getSimpleName(), printableOffsets); } }