Kafka
  1. Kafka
  2. KAFKA-1461

Replica fetcher thread does not implement any back-off behavior

    Details

    • Type: Improvement Improvement
    • Status: Patch Available
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 0.8.1.1
    • Fix Version/s: 0.8.3
    • Component/s: replication
    • Labels:

      Description

      The current replica fetcher thread will retry in a tight loop if any error occurs during the fetch call. For example, we've seen cases where the fetch continuously throws a connection refused exception leading to several replica fetcher threads that spin in a pretty tight loop.

      To a much lesser degree this is also an issue in the consumer fetcher thread, although the fact that erroring partitions are removed so a leader can be re-discovered helps some.

      1. KAFKA-1461.patch
        19 kB
        Sriharsha Chintalapani

        Issue Links

          Activity

          Hide
          Nicolae Marasoiu added a comment -

          Hi,

          So I guess in this block:
          try

          { trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) response = simpleConsumer.fetch(fetchRequest) }

          catch {
          case t: Throwable =>
          if (isRunning.get) {
          warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.toString))
          partitionMapLock synchronized

          { partitionsWithError ++= partitionMap.keys }

          }
          }
          I should add a case for the specific scenario of connection timeout/refused/reset and introduce a backoff on that path?

          Show
          Nicolae Marasoiu added a comment - Hi, So I guess in this block: try { trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) response = simpleConsumer.fetch(fetchRequest) } catch { case t: Throwable => if (isRunning.get) { warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.toString)) partitionMapLock synchronized { partitionsWithError ++= partitionMap.keys } } } I should add a case for the specific scenario of connection timeout/refused/reset and introduce a backoff on that path?
          Hide
          Guozhang Wang added a comment -

          I did not realize this ticket exist, and created the same one here (KAFKA-1629). It has some more detailed explanation of the issue though.

          Show
          Guozhang Wang added a comment - I did not realize this ticket exist, and created the same one here ( KAFKA-1629 ). It has some more detailed explanation of the issue though.
          Hide
          Joe Stein added a comment -

          Nicolae Cismaru are you working on this patch? If not can we assign it to unassigned so if someone wants to jump in and fix it, sure is annoying when it happens (like waiting on Recovering unflushed segment ) during that time every replica fetching from it spews the error ERROR kafka.server.ReplicaFetcherThread

          Show
          Joe Stein added a comment - Nicolae Cismaru are you working on this patch? If not can we assign it to unassigned so if someone wants to jump in and fix it, sure is annoying when it happens (like waiting on Recovering unflushed segment ) during that time every replica fetching from it spews the error ERROR kafka.server.ReplicaFetcherThread
          Hide
          Nicolae Marasoiu added a comment -

          I agree to give to someone else, did not made progress yet on this

          thank you

          În data de marți, 25 noiembrie 2014, Joe Stein (JIRA) <jira@apache.org> a

          Show
          Nicolae Marasoiu added a comment - I agree to give to someone else, did not made progress yet on this thank you În data de marți, 25 noiembrie 2014, Joe Stein (JIRA) <jira@apache.org> a
          Hide
          Sriharsha Chintalapani added a comment -

          Joe Stein Nicolae Marasoiu I can take this. I am looking at this code for another JIRA.

          Show
          Sriharsha Chintalapani added a comment - Joe Stein Nicolae Marasoiu I can take this. I am looking at this code for another JIRA.
          Hide
          Sriharsha Chintalapani added a comment - - edited

          Guozhang Wang I had the following code in my mind about backoff retries incase of any error. This code will be under ReplicaFetcherThread.handlePartitions.
          I am thinking off maintaining two maps in ReplicaFetcherThread
          private val partitionsWithErrorStandbyMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset
          private val partitionsWithErrorMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> timestamp
          one for offset and one for timestamp.
          remove the partitions from the AbstractFetcherThread.partitionsMap and add back to the map once the currentTime > partitionsWithErrorMap.timestamp + replicaFetcherRetryBackoffMs .
          I am not quite sure about maintaining these two maps . If its look ok to you , I'll send a patch or if you have any other approach please let me know.

            def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) {
          
              //add to the partitionsWithErrorMap with currentTime.
              for (partition <- partitions) {
                if(!partitionsWithErrorMap.contains(partition)) {
                  partitionsWithErrorMap.put(partition, System.currentTimeMillis())
                  currentOffset(partition) match {
                    case Some(offset: Long) =>  partitionsWithErrorStandbyMap.put(partition, offset)
                  }
                }
              }
              removePartitions(partitions.toSet)
              val partitionsToBeAdded = new mutable.HashMap[TopicAndPartition, Long]
              // process partitionsWithErrorMap and add partitions back if the backoff time elapsed.
              partitionsWithErrorMap.foreach {
                case((topicAndPartition, timeMs)) =>
                  if(System.currentTimeMillis() > timeMs + brokerConfig.replicaFetcherRetryBackoffMs) {
                    partitionsWithErrorStandbyMap.get(topicAndPartition) match {
                      case Some(offset: Long) => partitionsToBeAdded.put(topicAndPartition, offset)
                    }
                    partitionsWithErrorStandbyMap.remove(topicAndPartition)
                  }
              }
              addPartitions(partitionsToBeAdded)
            }
          
          Show
          Sriharsha Chintalapani added a comment - - edited Guozhang Wang I had the following code in my mind about backoff retries incase of any error. This code will be under ReplicaFetcherThread.handlePartitions. I am thinking off maintaining two maps in ReplicaFetcherThread private val partitionsWithErrorStandbyMap = new mutable.HashMap [TopicAndPartition, Long] // a (topic, partition) -> offset private val partitionsWithErrorMap = new mutable.HashMap [TopicAndPartition, Long] // a (topic, partition) -> timestamp one for offset and one for timestamp. remove the partitions from the AbstractFetcherThread.partitionsMap and add back to the map once the currentTime > partitionsWithErrorMap.timestamp + replicaFetcherRetryBackoffMs . I am not quite sure about maintaining these two maps . If its look ok to you , I'll send a patch or if you have any other approach please let me know. def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) { //add to the partitionsWithErrorMap with currentTime. for (partition <- partitions) { if (!partitionsWithErrorMap.contains(partition)) { partitionsWithErrorMap.put(partition, System .currentTimeMillis()) currentOffset(partition) match { case Some(offset: Long ) => partitionsWithErrorStandbyMap.put(partition, offset) } } } removePartitions(partitions.toSet) val partitionsToBeAdded = new mutable.HashMap[TopicAndPartition, Long ] // process partitionsWithErrorMap and add partitions back if the backoff time elapsed. partitionsWithErrorMap.foreach { case ((topicAndPartition, timeMs)) => if ( System .currentTimeMillis() > timeMs + brokerConfig.replicaFetcherRetryBackoffMs) { partitionsWithErrorStandbyMap.get(topicAndPartition) match { case Some(offset: Long ) => partitionsToBeAdded.put(topicAndPartition, offset) } partitionsWithErrorStandbyMap.remove(topicAndPartition) } } addPartitions(partitionsToBeAdded) }
          Hide
          Guozhang Wang added a comment -

          Sriharsha Chintalapani Sorry for the late reply.

          This fix looks good to me overall, except that we cannot potentially add partitions back only in the handlePartitionsWithErrors() call, since it will only be triggered when the next error happens. We can probably move this piece of code to processPartitionData().

          Another way to do this could be:

          1. Make the partitionMap in AbstractFetcherThread of a map from TopicAndPartition to OffsetAndState, where OffsetAndState contains the Offset (Long) and the State (active, inactive-with-delay). For simplicity we can just use Int here, and "active" would be 0, inactive would be the delay time.

          2. Adding another function called "delayPartitions" in AbstractFetcherThread, which set State to inactive with the delay time.

          3. In AbstractFetcherThread doWork() only include partitions with State 0 to send the fetch request, and also update the state values for non-zero partitions.

          Show
          Guozhang Wang added a comment - Sriharsha Chintalapani Sorry for the late reply. This fix looks good to me overall, except that we cannot potentially add partitions back only in the handlePartitionsWithErrors() call, since it will only be triggered when the next error happens. We can probably move this piece of code to processPartitionData(). Another way to do this could be: 1. Make the partitionMap in AbstractFetcherThread of a map from TopicAndPartition to OffsetAndState, where OffsetAndState contains the Offset (Long) and the State (active, inactive-with-delay). For simplicity we can just use Int here, and "active" would be 0, inactive would be the delay time. 2. Adding another function called "delayPartitions" in AbstractFetcherThread, which set State to inactive with the delay time. 3. In AbstractFetcherThread doWork() only include partitions with State 0 to send the fetch request, and also update the state values for non-zero partitions.
          Hide
          Idcmp added a comment - - edited

          This issue can be tickled on a multi-broker configuration by having brokers advertise host names that do not exist. (Say for example you're running Kafka in docker containers with custom hostnames

          Show
          Idcmp added a comment - - edited This issue can be tickled on a multi-broker configuration by having brokers advertise host names that do not exist. (Say for example you're running Kafka in docker containers with custom hostnames
          Hide
          Sriharsha Chintalapani added a comment -

          Created reviewboard https://reviews.apache.org/r/31366/diff/
          against branch origin/trunk

          Show
          Sriharsha Chintalapani added a comment - Created reviewboard https://reviews.apache.org/r/31366/diff/ against branch origin/trunk
          Hide
          Sriharsha Chintalapani added a comment -

          Guozhang Wang thanks for the pointers. Can you please take a look at the patch when you get a chance.

          Show
          Sriharsha Chintalapani added a comment - Guozhang Wang thanks for the pointers. Can you please take a look at the patch when you get a chance.

            People

            • Assignee:
              Sriharsha Chintalapani
              Reporter:
              Sam Meder
            • Votes:
              2 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:

                Development