Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-4596

KIP-73 rebalance throttling breaks on plans for specific partitions

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.10.2.0
    • Component/s: None
    • Labels:
      None
    • Environment:
      Kafka 0.10.1.1

      Description

      The reassign-partitions.sh command fails if you both throttle and give it a specific partition reassignment. For example, upon reassigning

      __consumer_offsets

      partition 19, you get the following error:

      Save this to use as the --reassignment-json-file option during rollback
      Warning: You must run Verify periodically, until the reassignment completes, to ensure the throttle is removed. You can also alter the throttle by rerunning the Execute command passing a new value.
      The throttle limit was set to 1048576 B/s
      Partitions reassignment failed due to key not found: [__consumer_offsets,30]
      java.util.NoSuchElementException: key not found: [__consumer_offsets,30]
              at scala.collection.MapLike$class.default(MapLike.scala:228)
              at scala.collection.AbstractMap.default(Map.scala:59)
              at scala.collection.MapLike$class.apply(MapLike.scala:141)
              at scala.collection.AbstractMap.apply(Map.scala:59)
              at kafka.admin.ReassignPartitionsCommand$$anonfun$kafka$admin$ReassignPartitionsCommand$$preRebalanceReplicaForMovingPartitions$1.apply(ReassignPartitionsCommand.scala:37
      2)
              at kafka.admin.ReassignPartitionsCommand$$anonfun$kafka$admin$ReassignPartitionsCommand$$preRebalanceReplicaForMovingPartitions$1.apply(ReassignPartitionsCommand.scala:37
      1)
              at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
              at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
              at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
              at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
              at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
              at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
              at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
              at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
              at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
              at kafka.admin.ReassignPartitionsCommand.kafka$admin$ReassignPartitionsCommand$$preRebalanceReplicaForMovingPartitions(ReassignPartitionsCommand.scala:371)
              at kafka.admin.ReassignPartitionsCommand$$anonfun$assignThrottledReplicas$2.apply(ReassignPartitionsCommand.scala:347)
              at kafka.admin.ReassignPartitionsCommand$$anonfun$assignThrottledReplicas$2.apply(ReassignPartitionsCommand.scala:343)
              at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
              at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
              at kafka.admin.ReassignPartitionsCommand.assignThrottledReplicas(ReassignPartitionsCommand.scala:343)
              at kafka.admin.ReassignPartitionsCommand.maybeThrottle(ReassignPartitionsCommand.scala:317)
              at kafka.admin.ReassignPartitionsCommand.reassignPartitions(ReassignPartitionsCommand.scala:387)
              at kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:165)
              at kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:149)
              at kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:46)
              at kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala)
      

      This effectively breaks the throttling feature unless you want to rebalance many many partitions at once.

      For reference the command that was run is:

      kafka-reassign-partitions.sh --reassignment-json-file 9b137f13_7e91_4757_a7b9_554ff458e7df.3d4269d5-e829
      -4e86-9d05-91e9e2fcb7e7.reassignment_plan.json --throttle 1048576'
      

      and the contents of the plan file is:

      {"version":1,"partitions":[{"topic":"__consumer_offsets","partition":19,"replicas":[2,1,0]}
      

      This seems like a simple logic error to me, where we're trying to look up a partition that's not been proposed, when we should not be. It looks like the logic assumes that

      Map.apply

      doesn't error if the lookup value isn't found, when in fact it does.

      I checked that this cluster does indeed have the __consumer_offsets topic populated.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                benstopford Ben Stopford
                Reporter:
                tcrayford-heroku Tom Crayford
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: