Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-1382

Update zkVersion on partition state update failures

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.8.1.2, 0.8.2.0
    • Component/s: None
    • Labels:
      None

      Description

      Our updateIsr code is currently:

      private def updateIsr(newIsr: Set[Replica]) {
      debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newIsr.mkString(",")))
      val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
      // use the epoch of the controller that made the leadership decision, instead of the current controller epoch
      val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
      ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
      ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
      if (updateSucceeded)

      { inSyncReplicas = newIsr zkVersion = newVersion trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion)) }

      else

      { info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion)) }

      We encountered an interesting scenario recently when a large producer fully
      saturated the broker's NIC for over an hour. The large volume of data led to
      a number of ISR shrinks (and subsequent expands). The NIC saturation
      affected the zookeeper client heartbeats and led to a session timeout. The
      timeline was roughly as follows:

      • Attempt to expand ISR
      • Expansion written to zookeeper (confirmed in zookeeper transaction logs)
      • Session timeout after around 13 seconds (the configured timeout is 20
        seconds) so that lines up.
      • zkclient reconnects to zookeeper (with the same session ID) and retries
        the write - but uses the old zkVersion. This fails because the zkVersion
        has already been updated (above).
      • The ISR expand keeps failing after that and the only way to get out of it
        is to bounce the broker.

      In the above code, if the zkVersion is different we should probably update
      the cached version and even retry the expansion until it succeeds.

        Attachments

        1. KAFKA-1382.patch
          5 kB
          Sriharsha Chintalapani
        2. KAFKA-1382_2014-06-16_14:19:27.patch
          28 kB
          Sriharsha Chintalapani
        3. KAFKA-1382_2014-06-16_13:50:16.patch
          30 kB
          Sriharsha Chintalapani
        4. KAFKA-1382_2014-06-11_09:37:22.patch
          221 kB
          Sriharsha Chintalapani
        5. KAFKA-1382_2014-06-09_18:23:42.patch
          28 kB
          Sriharsha Chintalapani
        6. KAFKA-1382_2014-06-07_09:00:56.patch
          24 kB
          Sriharsha Chintalapani
        7. KAFKA-1382_2014-06-04_12:30:40.patch
          21 kB
          Sriharsha Chintalapani
        8. KAFKA-1382_2014-05-31_15:50:25.patch
          34 kB
          Sriharsha Chintalapani
        9. KAFKA-1382_2014-05-30_21:19:21.patch
          5 kB
          Sriharsha Chintalapani

          Issue Links

            Activity

              People

              • Assignee:
                sriharsha Sriharsha Chintalapani
                Reporter:
                jjkoshy Joel Koshy
              • Votes:
                0 Vote for this issue
                Watchers:
                16 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: