Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-3963

Missing messages from the controller to brokers

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Invalid
    • None
    • None
    • core
    • None

    Description

      The controller takes messages from a queue and send it to the designated broker. If the controller times out on receiving a response from the broker (30s) it closes the connection and retries again after a backoff period, however it does not return the message back to the queue. As a result the retry will start with the next message and the previous message might have never been received by the broker.

          val QueueItem(apiKey, apiVersion, request, callback) = queue.take()
      ...
                try {
      ...
                    clientResponse = networkClient.blockingSendAndReceive(clientRequest)(time)
      ...
                  }
                } catch {
                  case e: Throwable => // if the send was not successful, reconnect to broker and resend the message
                    warn(("Controller %d epoch %d fails to send request %s to broker %s. " +
                      "Reconnecting to broker.").format(controllerId, controllerContext.epoch,
                        request.toString, brokerNode.toString()), e)
                    networkClient.close(brokerNode.idString)
      ...
                }
      

      This could violates the semantics that developers had assumed when writing controller-broker protocol. For example, the controller code sends metadata updates BEFORE sending LeaderAndIsrRequest when it communicates with a newly joined broker for the first time.

        def onBrokerStartup(newBrokers: Seq[Int]) {
          info("New broker startup callback for %s".format(newBrokers.mkString(",")))
          val newBrokersSet = newBrokers.toSet
          // send update metadata request to all live and shutting down brokers. Old brokers will get to know of the new
          // broker via this update.
          // In cases of controlled shutdown leaders will not be elected when a new broker comes up. So at least in the
          // common controlled shutdown case, the metadata will reach the new brokers faster
          sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
          // the very first thing to do when a new broker comes up is send it the entire list of partitions that it is
          // supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions
          val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
          replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, OnlineReplica)
      

      This is important because without the metadata cached in the broker the LeaderAndIsrRequests that ask the broker to become a follower would fail since there is no metadata for leader of the partition.

              metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
                // Only change partition state when the leader is available
                case Some(leaderBroker) =>
      ...
                case None =>
                  // The leader broker should always be present in the metadata cache.
                  // If not, we should record the error message and abort the transition process for this partition
                  stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" +
                    " %d epoch %d for partition [%s,%d] but cannot become follower since the new leader %d is unavailable.")
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            maysamyabandeh Maysam Yabandeh
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: