Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-289

Job fails with invalid checkpoint topic partition count

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 0.6.0
    • 0.7.0
    • kafka
    • None

    Description

      We've been seeing failures off and on lately with messages like this:

      org.apache.samza.checkpoint.kafka.KafkaCheckpointException: Checkpoint topic validation failed for topic __samza_checkpoint_samza-perf-store-all-calls_i001 because partition count 8 did not match expected partition count 64.
      

      This causes the entire job to fail. It is triggered the first time a Samza job is run in a cluster, and only on jobs with an input stream that has more than the default partition count (num.partitions in the Kafka broker).

      I believe there is a race condition in KafkaCheckpointManager. Right now we only run KafkaCheckpointManager.createTopic in the container that owns the first partition:

        def start {
          if (partitions.contains(new Partition(0))) {
            createTopic
          }
      
          validateTopic
        }
      

      If a container starts before the container with partition 0, then the container without partition 0 will just run validateTopic. This triggers a call to get TopicMetadata from Kafka. If the checkpoint topic doesn't exist, I believe the broker will say so, but it will also create the topic in the background. When it does this, num.partitions (the default partition count) will be used to define how many partitions the new checkpoint topic has.

      If a Samza job's task.input stream list contains a stream with a non-default number of partitions (e.g. num.partitions=8, but task.inputs has a stream with 16 partitions), then this race condition can trigger a checkpoint topic with 8 partitions, and the validation will fail.

      I think the simplest fix is just to strip the if statement from KafkaCheckpointManager.start, and have all containers try and create the checkpoint topic. This will eliminate the race condition, since all containers will try and create the checkpoint topic with the correct number of partitions.

      Attachments

        1. SAMZA-289.0.patch
          0.6 kB
          Chris Riccomini

        Activity

          People

            criccomini Chris Riccomini
            criccomini Chris Riccomini
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: