Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-1414

Speedup broker startup after hard reset

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.1.1, 0.8.2.0, 0.10.1.0
    • Fix Version/s: 0.8.2.0
    • Component/s: log
    • Labels:
      None

      Description

      After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally:

        /**
         * Recover and load all logs in the given data directories
         */
        private def loadLogs(dirs: Seq[File]) {
          val threads : Array[Thread] = new Array[Thread](dirs.size)
          var i: Int = 0
          val me = this
      
          for(dir <- dirs) {
            val thread = new Thread( new Runnable {
              def run()
              {
                val recoveryPoints = me.recoveryPointCheckpoints(dir).read
                /* load the logs */
                val subDirs = dir.listFiles()
                if(subDirs != null) {
                  val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
                  if(cleanShutDownFile.exists())
                    info("Found clean shutdown file. Skipping recovery for all logs in data directory '%s'".format(dir.getAbsolutePath))
                  for(dir <- subDirs) {
                    if(dir.isDirectory) {
                      info("Loading log '" + dir.getName + "'")
                      val topicPartition = Log.parseTopicPartitionName(dir.getName)
                      val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
                      val log = new Log(dir,
                        config,
                        recoveryPoints.getOrElse(topicPartition, 0L),
                        scheduler,
                        time)
                      val previous = addLogWithLock(topicPartition, log)
                      if(previous != null)
                        throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath))
                    }
                  }
                  cleanShutDownFile.delete()
                }
              }
            })
      
            thread.start()
            threads(i) = thread
            i = i + 1
          }
      
          for(thread <- threads) {
            thread.join()
          }
        }
      
        def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
          logCreationOrDeletionLock synchronized {
            this.logs.put(topicPartition, log)
          }
        }
      

        Attachments

        1. parallel-dir-loading-trunk-threadpool.patch
          4 kB
          Alexey Ozeritskiy
        2. parallel-dir-loading-trunk-fixed-threadpool.patch
          9 kB
          Anton Karamanov
        3. parallel-dir-loading-trunk.patch
          3 kB
          Alexey Ozeritskiy
        4. parallel-dir-loading-0.8.patch
          3 kB
          Alexey Ozeritskiy
        5. KAFKA-1414-rev5.patch
          26 kB
          Anton Karamanov
        6. KAFKA-1414-rev4.patch
          25 kB
          Anton Karamanov
        7. KAFKA-1414-rev3-interleaving.patch
          26 kB
          Anton Karamanov
        8. KAFKA-1414-rev3.patch
          25 kB
          Anton Karamanov
        9. KAFKA-1414-rev2.patch
          23 kB
          Anton Karamanov
        10. KAFKA-1414-rev2.fixed.patch
          23 kB
          Anton Karamanov
        11. KAFKA-1414-rev1.patch
          26 kB
          Anton Karamanov
        12. freebie.patch
          3 kB
          Anton Karamanov
        13. 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch
          15 kB
          Anton Karamanov

          Activity

            People

            • Assignee:
              ataraxer Anton Karamanov
              Reporter:
              dmitrybugaychenko Dmitry Bugaychenko
            • Votes:
              5 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: