Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-617

Refactor KafkaApis.handleListOffsetRequest

    Details

    • Type: Improvement
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      This code path is just really bad.

      All the business logic is in LogManager and Log.

      Also the implementation is really funky.

      LogManager already supports getLog and log allows you to get the segment list so there shouldn't be any logic except in KafkaApis.

        Activity

        Hide
        jkreps Jay Kreps added a comment -

        Evidence of code funk:

        def getOffsetsBefore(timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
        val segsArray = segments.view
        var offsetTimeArray: Array[(Long, Long)] = null
        if(segsArray.last.size > 0)
        offsetTimeArray = new Array[(Long, Long)](segsArray.length + 1)
        else
        offsetTimeArray = new Array[(Long, Long)](segsArray.length)

        for(i <- 0 until segsArray.length)
        offsetTimeArray = (segsArray.start, segsArray.messageSet.file.lastModified)
        if(segsArray.last.size > 0)
        offsetTimeArray(segsArray.length) = (logEndOffset, time.milliseconds)

        var startIndex = -1
        timestamp match {
        case OffsetRequest.LatestTime =>
        startIndex = offsetTimeArray.length - 1
        case OffsetRequest.EarliestTime =>
        startIndex = 0
        case _ =>
        var isFound = false
        debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
        startIndex = offsetTimeArray.length - 1
        while (startIndex >= 0 && !isFound)

        { if (offsetTimeArray(startIndex)._2 <= timestamp) isFound = true else startIndex -=1 }

        }

        val retSize = maxNumOffsets.min(startIndex + 1)
        val ret = new Array[Long](retSize)
        for(j <- 0 until retSize)

        { ret(j) = offsetTimeArray(startIndex)._1 startIndex -= 1 }

        // ensure that the returned seq is in descending order of offsets
        ret.toSeq.sortBy(- _)
        }

        I suspect this predates our knowledge of using scala collections fully and can maybe be reduced to a few maps and filters.

        Show
        jkreps Jay Kreps added a comment - Evidence of code funk: def getOffsetsBefore(timestamp: Long, maxNumOffsets: Int): Seq [Long] = { val segsArray = segments.view var offsetTimeArray: Array [(Long, Long)] = null if(segsArray.last.size > 0) offsetTimeArray = new Array [(Long, Long)] (segsArray.length + 1) else offsetTimeArray = new Array [(Long, Long)] (segsArray.length) for(i <- 0 until segsArray.length) offsetTimeArray = (segsArray .start, segsArray .messageSet.file.lastModified) if(segsArray.last.size > 0) offsetTimeArray(segsArray.length) = (logEndOffset, time.milliseconds) var startIndex = -1 timestamp match { case OffsetRequest.LatestTime => startIndex = offsetTimeArray.length - 1 case OffsetRequest.EarliestTime => startIndex = 0 case _ => var isFound = false debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2))) startIndex = offsetTimeArray.length - 1 while (startIndex >= 0 && !isFound) { if (offsetTimeArray(startIndex)._2 <= timestamp) isFound = true else startIndex -=1 } } val retSize = maxNumOffsets.min(startIndex + 1) val ret = new Array [Long] (retSize) for(j <- 0 until retSize) { ret(j) = offsetTimeArray(startIndex)._1 startIndex -= 1 } // ensure that the returned seq is in descending order of offsets ret.toSeq.sortBy(- _) } I suspect this predates our knowledge of using scala collections fully and can maybe be reduced to a few maps and filters.
        Hide
        jozi-k Jozef Koval added a comment -

        Hi Jay Kreps, I would like to work on this issue, but handleOffsetRequest method does not exist anymore. However, there are commit, fetch and for leader epoch requests. Can you please update the issue according to newest changes so I can go on? Many thanks.

        Show
        jozi-k Jozef Koval added a comment - Hi Jay Kreps , I would like to work on this issue, but handleOffsetRequest method does not exist anymore. However, there are commit, fetch and for leader epoch requests. Can you please update the issue according to newest changes so I can go on? Many thanks.
        Hide
        guozhang Guozhang Wang added a comment -

        Jozef Koval I think it has been named to handleListOffsetRequest now, updated the description as well.

        Show
        guozhang Guozhang Wang added a comment - Jozef Koval I think it has been named to handleListOffsetRequest now, updated the description as well.

          People

          • Assignee:
            Unassigned
            Reporter:
            jkreps Jay Kreps
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:

              Development