Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-7047

Direct Runner loses checkpoint marks when committing offsets to Apache Kafka partitions using KafkaIO

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 2.9.0
    • Fix Version/s: None
    • Labels:
      None

      Description

      On our project we are using Apache Kafka as a source for most of our Apache Beam pipelines.

      We would like to leverage manual offset commit functionality implemented by KafkaIO, and enabled by commitOffsetsInFinalize option.

      We also written several tests that sort of represent and document this functionality, and should run during our CI process using Direct Runner.

      However we experienced issues during tests implementation, particularly we see that on Direct Runner  not all checkpoint marks which in case of KafkaIO represent partition offsets are committed.

      I've created sample project, attached as external link and as src attachment to JIRA ticket, to showcase the issue. 

      The result of this test execution is not deterministic, when failing the exemplary stacktrace is as follows:

      org.awaitility.core.ConditionTimeoutException: Condition with alias 'sent raw messages are read and offsets are committed' didn't complete within 3 minutes because lambda expression in com.marknorkin.beam.directrunner.sample.ParserEndToEndFlowCommitOffsetsTest: expected <{raw_topic-0=10, raw_topic-1=10, raw_topic-2=10}> but was <{raw_topic-1=10, raw_topic-0=10}>.
      
      	at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:145)
      	at org.awaitility.core.AbstractHamcrestCondition.await(AbstractHamcrestCondition.java:89)
      	at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:902)
      	at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:645)
      	at com.marknorkin.beam.directrunner.sample.ParserEndToEndFlowCommitOffsetsTest.shouldTestOffsetCommit(ParserEndToEndFlowCommitOffsetsTest.java:138)
      

       

      This issue is probably not specific to KafkaIO, as it Direct Runner when finalizing Checkpoint Marks works within general CheckpointMark interface.

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              marknorkin Mark Norkin
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated: