Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-754

Race condition between coordinator stream producer and coordinator stream consumer in JobRunner

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.10.0
    • Fix Version/s: 0.10.0
    • Component/s: None
    • Labels:
      None

      Description

      Removing the checkpoint related configuration in samza-test/.../integration tests seems to have a negative impact:
      It results in integration test failure when there are multiple tests running in a row. It might be related to coordinator stream migration.

      1. SAMZA-754-0.patch
        5 kB
        Yi Pan (Data Infrastructure)

        Activity

        Hide
        rzuljevic Robert Žuljević added a comment -

        Wasn't coordinator stream migration patched in on 10th August?

        In any case, I tried running the Zopkio integration tests from current master and since the node manager does not shut down in 5 seconds a script is called that kills all processes, but it only throws NotImplementedError at deployer.py:130.

        Show
        rzuljevic Robert Žuljević added a comment - Wasn't coordinator stream migration patched in on 10th August? In any case, I tried running the Zopkio integration tests from current master and since the node manager does not shut down in 5 seconds a script is called that kills all processes, but it only throws NotImplementedError at deployer.py:130.
        Hide
        nickpan47 Yi Pan (Data Infrastructure) added a comment -

        Robert Žuljević, sorry I should be more specific. The integration tests I referred to is in the samza-test module, the stateful task tests. It is executed in gradle build, not in Zopkio. Specifically, the checkpoint factory config should not take any effect and should be removed in 0.10.0. However, if we remove it from the stateful tests' configuration, it causes errors. That should be fixed, not in the Zopkio test. I have updated the JIRA summary to reflect the issue more precisely.

        Show
        nickpan47 Yi Pan (Data Infrastructure) added a comment - Robert Žuljević , sorry I should be more specific. The integration tests I referred to is in the samza-test module, the stateful task tests. It is executed in gradle build, not in Zopkio. Specifically, the checkpoint factory config should not take any effect and should be removed in 0.10.0. However, if we remove it from the stateful tests' configuration, it causes errors. That should be fixed, not in the Zopkio test. I have updated the JIRA summary to reflect the issue more precisely.
        Hide
        navina Navina Ramesh added a comment -

        Isn't this related to SAMZA-671 , Yi Pan (Data Infrastructure) ?

        Show
        navina Navina Ramesh added a comment - Isn't this related to SAMZA-671 , Yi Pan (Data Infrastructure) ?
        Hide
        nickpan47 Yi Pan (Data Infrastructure) added a comment -

        Yeah, I think so.

        Show
        nickpan47 Yi Pan (Data Infrastructure) added a comment - Yeah, I think so.
        Hide
        nickpan47 Yi Pan (Data Infrastructure) added a comment -

        Turns out that this is related to the race condition between the CoordinatorStreamSystemProducer and CoordinatorStreamSystemConsumer in JobRunner code:

            if (resetJobConfig) {
              info("Storing config in coordinator stream.")
              coordinatorSystemProducer.register(JobRunner.SOURCE)
              coordinatorSystemProducer.start
              coordinatorSystemProducer.writeConfig(JobRunner.SOURCE, config)
            }
            info("Loading old config from coordinator stream.")
            coordinatorSystemConsumer.register
            coordinatorSystemConsumer.start
            coordinatorSystemConsumer.bootstrap
            coordinatorSystemConsumer.stop
        

        There is a chance that if the topic is empty, and coordinator system producer wrote the config to the Kafka broker as the first message. When the coordinator system consumer immediately does a fetchOffset() call on the topic, the broker may return "null" as the starting offset due to that the first message has not been completely committed yet. In this case, it causes the NPE in CoordinatorSystemConsumer.register(). The right fix would need to treat this case as if the starting offset is "oldest" case, instead of null.

        Show
        nickpan47 Yi Pan (Data Infrastructure) added a comment - Turns out that this is related to the race condition between the CoordinatorStreamSystemProducer and CoordinatorStreamSystemConsumer in JobRunner code: if (resetJobConfig) { info( "Storing config in coordinator stream." ) coordinatorSystemProducer.register(JobRunner.SOURCE) coordinatorSystemProducer.start coordinatorSystemProducer.writeConfig(JobRunner.SOURCE, config) } info( "Loading old config from coordinator stream." ) coordinatorSystemConsumer.register coordinatorSystemConsumer.start coordinatorSystemConsumer.bootstrap coordinatorSystemConsumer.stop There is a chance that if the topic is empty, and coordinator system producer wrote the config to the Kafka broker as the first message. When the coordinator system consumer immediately does a fetchOffset() call on the topic, the broker may return "null" as the starting offset due to that the first message has not been completely committed yet. In this case, it causes the NPE in CoordinatorSystemConsumer.register(). The right fix would need to treat this case as if the starting offset is "oldest" case, instead of null.
        Hide
        nickpan47 Yi Pan (Data Infrastructure) added a comment -

        Noticed that Chris Riccomini has reported the same issue here: SAMZA-542.

        Show
        nickpan47 Yi Pan (Data Infrastructure) added a comment - Noticed that Chris Riccomini has reported the same issue here: SAMZA-542 .
        Hide
        navina Navina Ramesh added a comment -

        Yeah.. This issue is not really specific to coordinator stream. I think that is how things work in OffsetManager.

        Show
        navina Navina Ramesh added a comment - Yeah.. This issue is not really specific to coordinator stream. I think that is how things work in OffsetManager.
        Show
        nickpan47 Yi Pan (Data Infrastructure) added a comment - RB available: https://reviews.apache.org/r/40421/
        Hide
        navina Navina Ramesh added a comment -

        Left one comment in the RB. Otherwise, looks good. +1

        Show
        navina Navina Ramesh added a comment - Left one comment in the RB. Otherwise, looks good. +1
        Hide
        nickpan47 Yi Pan (Data Infrastructure) added a comment -

        merged and submitted. Thanks for the review!

        Show
        nickpan47 Yi Pan (Data Infrastructure) added a comment - merged and submitted. Thanks for the review!

          People

          • Assignee:
            nickpan47 Yi Pan (Data Infrastructure)
            Reporter:
            nickpan47 Yi Pan (Data Infrastructure)
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development