Kafka
  1. Kafka
  2. KAFKA-617

Refactor KafkaApis.handleOffsetRequest

    Details

    • Type: Improvement Improvement
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      This code path is just really bad.

      All the business logic is in LogManager and Log.

      Also the implementation is really funky.

      LogManager already supports getLog and log allows you to get the segment list so there shouldn't be any logic except in KafkaApis.

        Activity

        Hide
        Jay Kreps added a comment -

        Evidence of code funk:

        def getOffsetsBefore(timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
        val segsArray = segments.view
        var offsetTimeArray: Array[(Long, Long)] = null
        if(segsArray.last.size > 0)
        offsetTimeArray = new Array[(Long, Long)](segsArray.length + 1)
        else
        offsetTimeArray = new Array[(Long, Long)](segsArray.length)

        for(i <- 0 until segsArray.length)
        offsetTimeArray = (segsArray.start, segsArray.messageSet.file.lastModified)
        if(segsArray.last.size > 0)
        offsetTimeArray(segsArray.length) = (logEndOffset, time.milliseconds)

        var startIndex = -1
        timestamp match {
        case OffsetRequest.LatestTime =>
        startIndex = offsetTimeArray.length - 1
        case OffsetRequest.EarliestTime =>
        startIndex = 0
        case _ =>
        var isFound = false
        debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
        startIndex = offsetTimeArray.length - 1
        while (startIndex >= 0 && !isFound)

        { if (offsetTimeArray(startIndex)._2 <= timestamp) isFound = true else startIndex -=1 }

        }

        val retSize = maxNumOffsets.min(startIndex + 1)
        val ret = new Array[Long](retSize)
        for(j <- 0 until retSize)

        { ret(j) = offsetTimeArray(startIndex)._1 startIndex -= 1 }

        // ensure that the returned seq is in descending order of offsets
        ret.toSeq.sortBy(- _)
        }

        I suspect this predates our knowledge of using scala collections fully and can maybe be reduced to a few maps and filters.

        Show
        Jay Kreps added a comment - Evidence of code funk: def getOffsetsBefore(timestamp: Long, maxNumOffsets: Int): Seq [Long] = { val segsArray = segments.view var offsetTimeArray: Array [(Long, Long)] = null if(segsArray.last.size > 0) offsetTimeArray = new Array [(Long, Long)] (segsArray.length + 1) else offsetTimeArray = new Array [(Long, Long)] (segsArray.length) for(i <- 0 until segsArray.length) offsetTimeArray = (segsArray .start, segsArray .messageSet.file.lastModified) if(segsArray.last.size > 0) offsetTimeArray(segsArray.length) = (logEndOffset, time.milliseconds) var startIndex = -1 timestamp match { case OffsetRequest.LatestTime => startIndex = offsetTimeArray.length - 1 case OffsetRequest.EarliestTime => startIndex = 0 case _ => var isFound = false debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2))) startIndex = offsetTimeArray.length - 1 while (startIndex >= 0 && !isFound) { if (offsetTimeArray(startIndex)._2 <= timestamp) isFound = true else startIndex -=1 } } val retSize = maxNumOffsets.min(startIndex + 1) val ret = new Array [Long] (retSize) for(j <- 0 until retSize) { ret(j) = offsetTimeArray(startIndex)._1 startIndex -= 1 } // ensure that the returned seq is in descending order of offsets ret.toSeq.sortBy(- _) } I suspect this predates our knowledge of using scala collections fully and can maybe be reduced to a few maps and filters.

          People

          • Assignee:
            Unassigned
            Reporter:
            Jay Kreps
          • Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

            • Created:
              Updated:

              Development