Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-990

KafkaIO does not commit offsets to Kafka

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • None
    • None
    • io-java-kafka

    Description

      I use KafkaIO as a source, and I would like consumed offsets to be stored in Kafka (in the __consumer_offsets topic).

      I'm configuring the Kafka reader with

      .updateConsumerProperties(ImmutableMap.of(
                    ConsumerConfig.GROUP_ID_CONFIG, "my-group",
                    ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, java.lang.Boolean.TRUE,
                    ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10" // doesn't work with default value either (5000ms)
                  ))
      

      But the offsets are not stored in Kafka (nothing in __consumer_offsets, next job will restart at latest offset).

      I can't find in the code where the offsets are supposed to be committed.

      I tried to add a manual commit in the consumerPollLoop() method, and it works, offsets are committed:

      private void consumerPollLoop() {
                  // Read in a loop and enqueue the batch of records, if any, to availableRecordsQueue
                  while (!closed.get()) {
                      try {
                          ConsumerRecords<byte[], byte[]> records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
                          if (!records.isEmpty() && !closed.get()) {
                              availableRecordsQueue.put(records); // blocks until dequeued.
                              // Manual commit
                              consumer.commitSync();
                          }
                      } catch (InterruptedException e) {
                          LOG.warn("{}: consumer thread is interrupted", this, e); // not expected
                          break;
                      } catch (WakeupException e) {
                          break;
                      }
                  }
      
                  LOG.info("{}: Returning from consumer pool loop", this);
              }
      

      Is this a bug in KafkaIO or am I misconfiguring something?

      Disclamer: I'm currently using KafkaIO in Dataflow, using the backport in Dataflow SDK (https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java), but I'm confident the code is similar for this case.

      Edit: I found the correct method where KafkaIO is supposed to commit at the end of a batch. I'm currently testing it and will be able to open a pull request soon:

      // KafkaCheckpointMark.java
      
          /**
           * Optional consumer that will be used to commit offsets into Kafka when finalizeCheckpoint() is called
           */
          @Nullable
          private final Consumer consumer;
      
          public KafkaCheckpointMark(List<PartitionMark> partitions, @Nullable Consumer consumer) {
              this.partitions = partitions;
              this.consumer = consumer;
          }
      
          /**
           * Commit synchronously into Kafka offsets that have been passed downstream.
           */
          @Override
          public void finalizeCheckpoint() throws IOException {
              if (consumer == null) {
                  LOG.warn("finalizeCheckpoint(): no consumer provided, will not commit anything.");
                  return;
              }
              if (partitions.size() == 0) {
                  LOG.info("finalizeCheckpoint(): nothing to commit to Kafka.");
                  return;
              }
      
              final Map<TopicPartition, OffsetAndMetadata> offsets = newHashMap();
              String committedOffsets = "";
              for (PartitionMark partition : partitions) {
                  TopicPartition topicPartition = partition.getTopicPartition();
                  offsets.put(topicPartition, new OffsetAndMetadata(partition.offset));
                  committedOffsets += topicPartition.topic() + "-" + topicPartition.partition() + ":" + partition.offset + ",";
              }
      
              final String printableOffsets = committedOffsets.substring(0, committedOffsets.length() - 1);
              try {
                  consumer.commitSync(offsets);
                  LOG.info("finalizeCheckpoint(): committed Kafka offsets {}", printableOffsets);
              } catch (Exception e) {
                  LOG.error("finalizeCheckpoint(): {} when trying to commit Kafka offsets [{}]",
                          e.getClass().getSimpleName(),
                          printableOffsets);
              }
          }
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            alban@perillat.org Alban Perillat-Merceroz
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated: