Details
-
Improvement
-
Status: Patch Available
-
Critical
-
Resolution: Unresolved
-
0.8.1.1
-
None
-
None
Description
Controlled shutdown as implemented currently can cause numerous problems: deadlocks, local and global datalos, partitions without leader and etc. In some cases the only way to restore cluster is to stop it completelly using kill -9 and start again.
Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase queue size makes things much worse (see discussion there).
Note 2: The problems described here can occure in any setup, but they are extremly painful in setup with large brokers (36 disks, 1000+ assigned partitions per broker in our case).
Note 3: These improvements are actually workarounds and it is worth to consider global refactoring of the controller (make it single thread, or even get rid of it in the favour of ZK leader elections for partitions).
The problems and improvements are:
- Controlled shutdown takes a long time (10+ minutes), broker sends multiple shutdown requests and finally considers it as failed and procedes to unclean shutdow, controller got stuck for a while (holding a lock waiting for free space in controller-to-broker queue). After broker starts back it receives followers request and erases highwatermarks (with a message that "replica does not exists" - controller hadn't yet sent a request with replica assignment), then controller starts replicas on the broker it deletes all local data (due to missing highwatermarks). Furthermore, controller starts processing pending shutdown request and stops replicas on the broker letting it in a non-functional state. Solution to the problem might be to increase time broker waits for controller reponse to shutdown request, but this timeout is taken from controller.socket.timeout.ms which is global for all broker-controller communication and setting it to 30 minutes is dangerous. Proposed solution: introduce dedicated config parameter for this timeout with a high default.
- If a broker gets down during controlled shutdown and did not come back controller got stuck in a deadlock (one thread owns the lock and tries to add message to the dead broker's queue, send thread is a infinite loop trying to retry message to the dead broker, and the broker failure handler is waiting for a lock). There are numerous partitions without a leader and the only way out is to kill -9 the controller. Proposed solution: add timeout for adding message to broker's queue. ControllerChannelManager.sendRequest:
def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) { brokerLock synchronized { val stateInfoOpt = brokerStateInfo.get(brokerId) stateInfoOpt match { case Some(stateInfo) => // ODKL Patch: prevent infinite hang on trying to send message to a dead broker. // TODO: Move timeout to config if (!stateInfo.messageQueue.offer((request, callback), 10, TimeUnit.SECONDS)) { error("Timed out trying to send message to broker " + brokerId.toString) // Do not throw, as it brings controller into completely non-functional state // "Controller to broker state change requests batch is not empty while creating a new one" //throw new IllegalStateException("Timed out trying to send message to broker " + brokerId.toString) } case None => warn("Not sending request %s to broker %d, since it is offline.".format(request, brokerId)) } } }
- When broker which is a controler starts shut down if auto leader rebalance is running it deadlocks in the end (shutdown thread owns the lock and waits for rebalance thread to exit and rebalance thread wait for lock). Proposed solution: use bounded wait in rebalance thread. KafkaController.scala:
// ODKL Patch to prevent deadlocks in shutdown. /** * Execute the given function inside the lock */ def inLockIfRunning[T](lock: ReentrantLock)(fun: => T): T = { if (isRunning || lock.isHeldByCurrentThread) { // TODO: Configure timeout. if (!lock.tryLock(10, TimeUnit.SECONDS)) { throw new IllegalStateException("Failed to acquire controller lock in 10 seconds."); } try { return fun } finally { lock.unlock() } } else { throw new IllegalStateException("Controller is not running, not allowed to lock.") } } private def checkAndTriggerPartitionRebalance(): Unit = { // Use inLockIfRunning here instead of inLock }
- Both OfflinePartitionLeaderSelector and ControlledShutdownLeaderSelector act in a way that they prefer the oldes replica in ISR (the one that joined the ISR first). In case of rolling update it means moving partitions to the tail which increases the overal amount of movements and finally significantly overloads the last broker (with 4 brokers and RF 3 the last one gets 3/4 of leadership). In case of multiple failures this logic can cuase a significant disbalance in the leadership. Proposed solution: Move leadership to preferd replica if possible or to the younges replica (in controlled shutdown) or second prefered replica (in offline partition):
class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { ... // ODKL Patch: Trying to select replica from ISR not depending on ISR join order, but following the // assignment order. Preferred replica is the first one, thus if possible it'll be chosen, but most // probably it is the dead one, thus we fallback to second preferred replica. Here we do not care about // overloading second preferred replica as we do not expect rolling crashed. val newLeader = liveBrokersInIsr.sortBy(x => assignedReplicas.indexOf(x)).head ... } class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { ... // ODKL Patch: Trying to select replica from ISR not depending on ISR join order. If preferred replica is in ISR, choose // it, choose the last replica from ISR - it is expected to be the youngest (most probably already survived rolling // update) val newLeaderOpt = if (newIsr.contains(assignedReplicas.head)) assignedReplicas.headOption else newIsr.lastOption ... }
- Auto leader rebalance started simultaneously with controlled shutdown compete with it for space in queue and can slow down the process. If the queue size is large it could also create a significant data loss (for few minutes there might be multiple brokers considering itself as a leader and accepting produce requests). Proposed solution: add throttling to the auto rebalance:
private def checkAndTriggerPartitionRebalance(): Unit = { ... if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) { info("Balancing broker " + leaderBroker + " with imbalance rate " + imbalanceRatio) topicsNotInPreferredReplica.foreach { case (topicPartition, replicas) => { inLockIfRunning(controllerContext.controllerLock) { // do this check only if the broker is live and there are no partitions being reassigned currently // and preferred replica election is not in progress if (controllerContext.liveBrokerIds.contains(leaderBroker) && controllerContext.partitionsBeingReassigned.size == 0 && controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 && !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) && !deleteTopicManager.isTopicDeletionInProgress(topicPartition.topic) && controllerContext.allTopics.contains(topicPartition.topic)) { onPreferredReplicaElection(Set(topicPartition), true) } } // ODKL patch: prevent too fast prefered replica elections. // TODO: Make configurable/use true throttling Utils.swallow(Thread.sleep(2000)) } } info("Balancing broker " + leaderBroker + " done") } ... }
Attachments
Attachments
Issue Links
- relates to
-
KAFKA-2139 Add a separate controller messge queue with higher priority on broker side
- Resolved