Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: core
    • Labels:

      Description

      It would be good if we add the following metrics:

      Producer: droppedMessageRate per topic

      ReplicaManager: partition count on the broker

      FileMessageSet: logFlushTimer per log (i.e., partition). Also, logFlushTime should probably be moved to LogSegment since the flush now includes index flush time.

      1. kafka_604_v1.patch
        12 kB
        Yang Ye
      2. kafka_604_v2.patch
        12 kB
        Yang Ye
      3. kafka_604_v3.diff
        17 kB
        Yang Ye
      4. kafka-604-new.patch
        6 kB
        Swapnil Ghike
      5. kafka-604-new-v2.patch
        17 kB
        Swapnil Ghike

        Activity

        Neha Narkhede made changes -
        Status Resolved [ 5 ] Closed [ 6 ]
        Jun Rao made changes -
        Status Open [ 1 ] Resolved [ 5 ]
        Fix Version/s 0.8 [ 12317244 ]
        Resolution Fixed [ 1 ]
        Hide
        Jun Rao added a comment -

        Thanks for the latest patch. Committed to 0.8.

        Show
        Jun Rao added a comment - Thanks for the latest patch. Committed to 0.8.
        Swapnil Ghike made changes -
        Attachment kafka-604-new-v2.patch [ 12570594 ]
        Hide
        Swapnil Ghike added a comment -

        Thanks Jun for observing that "All.Topics" does not uniquely identify all topics' stats from stats for a topic called All.Topics.

        Made a change so that the stats for all topics will look like "AllTopicsMessagesPerSec", and those for a topic called AllTopics will look like "AllTopics-MessagesPerSec". Same change for All.Brokers too.

        Show
        Swapnil Ghike added a comment - Thanks Jun for observing that "All.Topics" does not uniquely identify all topics' stats from stats for a topic called All.Topics. Made a change so that the stats for all topics will look like "AllTopicsMessagesPerSec", and those for a topic called AllTopics will look like "AllTopics-MessagesPerSec". Same change for All.Brokers too.
        Swapnil Ghike made changes -
        Attachment kafka-604-new.patch [ 12570588 ]
        Hide
        Swapnil Ghike added a comment -

        Added droppedMessageRate per topic in Producer, partition count on the broker in ReplicaManager.

        Moved droppedMessageRate to per-topic stats, the droppedMessageRate for all topics together is now provided by allTopicsStats in ProducerTopicStats.

        Removed the thread id from ProducerQueueSize guage.

        Show
        Swapnil Ghike added a comment - Added droppedMessageRate per topic in Producer, partition count on the broker in ReplicaManager. Moved droppedMessageRate to per-topic stats, the droppedMessageRate for all topics together is now provided by allTopicsStats in ProducerTopicStats. Removed the thread id from ProducerQueueSize guage.
        Neha Narkhede made changes -
        Labels p1 p2
        Neha Narkhede made changes -
        Labels p1
        Swapnil Ghike made changes -
        Assignee Swapnil Ghike [ swapnilghike ]
        Hide
        Swapnil Ghike added a comment -

        I can do this after finishing KAFKA-755.

        Show
        Swapnil Ghike added a comment - I can do this after finishing KAFKA-755 .
        Hide
        Jun Rao added a comment -

        Also, in ProducerSendThread, we should get rid of the thread id part from the jmx bean name.
        newGauge(clientId + "ProducerQueueSize" + getId,
        new Gauge[Int]

        { def getValue = queue.size }

        )

        Show
        Jun Rao added a comment - Also, in ProducerSendThread, we should get rid of the thread id part from the jmx bean name. newGauge(clientId + " ProducerQueueSize " + getId, new Gauge [Int] { def getValue = queue.size } )
        Jun Rao made changes -
        Priority Major [ 3 ] Blocker [ 1 ]
        Hide
        Jun Rao added a comment -

        This patch no longer applies. Thinking a bit. We probably just need to get the following stats in jmx. The per topic level flush time is probably overkill.

        Producer: droppedMessageRate per topic

        ReplicaManager: partition count on the broker

        Show
        Jun Rao added a comment - This patch no longer applies. Thinking a bit. We probably just need to get the following stats in jmx. The per topic level flush time is probably overkill. Producer: droppedMessageRate per topic ReplicaManager: partition count on the broker
        Hide
        Yang Ye added a comment -

        Joel, thanks for the idea, it's good and I will do a new patch

        Sent from my iPhone

        Show
        Yang Ye added a comment - Joel, thanks for the idea, it's good and I will do a new patch Sent from my iPhone
        Hide
        Swapnil Ghike added a comment -

        Yes I like this idea, can follow this in KAFKA-646.

        Show
        Swapnil Ghike added a comment - Yes I like this idea, can follow this in KAFKA-646 .
        Hide
        Joel Koshy added a comment -

        The rebased patch does not really comment on/address the concern in the original review - i.e., it breaks a relatively clean
        and convenient syntax in the common case (of updating a single timer) to support updates to multiple timers. It's obviously
        not a particularly major issue, but a rung higher than say, a debate over whitespace/coding style. My only point is that we
        should try and avoid making a change that goes from a nice syntax to a rather inconvenient syntax. For that reason, I'm
        more in favor of the change to KafkaTimer in KAFKA-646.

        That said, how about the following: both these patches need multiple timer updates to update an aggregate timer as well
        as a specific timer - i.e., up to this point we don't really have a use case for updating more than two timers simultaneously.
        So we can accomplish this case with the following:

        aggregatekafkatimer.time {
        specifickafkatimer.time

        { <code block> }

        }

        and avoid any change to KafkaTimer. Does that seem reasonable to you guys?

        Show
        Joel Koshy added a comment - The rebased patch does not really comment on/address the concern in the original review - i.e., it breaks a relatively clean and convenient syntax in the common case (of updating a single timer) to support updates to multiple timers. It's obviously not a particularly major issue, but a rung higher than say, a debate over whitespace/coding style. My only point is that we should try and avoid making a change that goes from a nice syntax to a rather inconvenient syntax. For that reason, I'm more in favor of the change to KafkaTimer in KAFKA-646 . That said, how about the following: both these patches need multiple timer updates to update an aggregate timer as well as a specific timer - i.e., up to this point we don't really have a use case for updating more than two timers simultaneously. So we can accomplish this case with the following: aggregatekafkatimer.time { specifickafkatimer.time { <code block> } } and avoid any change to KafkaTimer. Does that seem reasonable to you guys?
        Hide
        Swapnil Ghike added a comment -

        Wow, this is going to be fun. Unfortunately patch v3 here has a lot of conflicts with patch v5 at KAFKA-646 where I have done a wholesale reorganization of metrics. The conflicts are mostly in the use of KafkaTimer.

        I agree that having a KafkaTimer object might be more efficient at runtime, but the tradeoff is that it changes the syntax of time measurement with some instances where we will need to pass in (

        {a bunch of statements spread out over multiple lines in a block}

        , timer).

        Victor, can you take a look at patch v5 at the other jira and see which style of time measurement you prefer?

        Perhaps Joel can also comment on the syntax changes.

        Show
        Swapnil Ghike added a comment - Wow, this is going to be fun. Unfortunately patch v3 here has a lot of conflicts with patch v5 at KAFKA-646 where I have done a wholesale reorganization of metrics. The conflicts are mostly in the use of KafkaTimer. I agree that having a KafkaTimer object might be more efficient at runtime, but the tradeoff is that it changes the syntax of time measurement with some instances where we will need to pass in ( {a bunch of statements spread out over multiple lines in a block} , timer). Victor, can you take a look at patch v5 at the other jira and see which style of time measurement you prefer? Perhaps Joel can also comment on the syntax changes.
        Yang Ye made changes -
        Attachment kafka_604_v3.diff [ 12560497 ]
        Hide
        Yang Ye added a comment -

        1. rebased

        2. add new timer metrics

        unit tests passed and the new timer mBeans are verified.

        This patch had better be checked in earlier as it touches quite a few files whose future changes may need furhur rebase

        Show
        Yang Ye added a comment - 1. rebased 2. add new timer metrics unit tests passed and the new timer mBeans are verified. This patch had better be checked in earlier as it touches quite a few files whose future changes may need furhur rebase
        Hide
        Yang Ye added a comment -

        Sure, I'll do that soon

        Best,
        -----------------------
        Victor Yang Ye
        +1(650)283-6547

        http://www.linkedin.com/pub/victor-yang-ye/13/ba3/4b8<http://www.linkedin.com/profile/view?id=47740172>
        http://www.facebook.com/yeyangever

        Founder Uyan.cc
        Software Engineer in Distributed Data System Group, LinkedIn Corporation
        Dept. of Computer Science, Graduate School of Art and Scient, Columbia
        University
        Special Pilot Computer Science Class, Tsinghua University

        yeyangever@gmail.com
        yye@linkedin.com
        yy2314@columbia.edu

        Show
        Yang Ye added a comment - Sure, I'll do that soon Best, ----------------------- Victor Yang Ye +1(650)283-6547 http://www.linkedin.com/pub/victor-yang-ye/13/ba3/4b8 < http://www.linkedin.com/profile/view?id=47740172 > http://www.facebook.com/yeyangever Founder Uyan.cc Software Engineer in Distributed Data System Group, LinkedIn Corporation Dept. of Computer Science, Graduate School of Art and Scient, Columbia University Special Pilot Computer Science Class, Tsinghua University yeyangever@gmail.com yye@linkedin.com yy2314@columbia.edu
        Hide
        Jun Rao added a comment -

        The patch no longer applies. We should probably also add the following metrics in the Producer:
        1. Serialization time: time to encoder each event.
        2. Handle time: time to handle one or a batch of events, which includes serialization time, request sending time, and retry time.

        Show
        Jun Rao added a comment - The patch no longer applies. We should probably also add the following metrics in the Producer: 1. Serialization time: time to encoder each event. 2. Handle time: time to handle one or a batch of events, which includes serialization time, request sending time, and retry time.
        Yang Ye made changes -
        Attachment kafka_604_v2.patch [ 12555263 ]
        Hide
        Yang Ye added a comment -

        rebased patch

        Show
        Yang Ye added a comment - rebased patch
        Hide
        Joel Koshy added a comment -

        Couple other options that we discussed offline for KafkaTimer:

        • Add the new multi-timer code to the object level
        • Since the most common (only?) use-case is updating a specific and a global timer, have an additional Option[Timer] for the global timer in the constructor that defaults to None
        • Currying. e.g.,

        def timeWith[A](timers: Timer*)(f: => A) = {
        val ctxSeq = timers.map(_.time())
        try

        { f }

        finally

        { ctxSeq.foreach(_.stop()) }

        }

        and usage would be KafkaTimer.timeWith(specificTimer, globalTimer)

        { // block of code to time }

        Clearly I'm obsessive compulsive but I think it would be good to avoid breaking the clean user-side syntax that the existing API allows.

        Show
        Joel Koshy added a comment - Couple other options that we discussed offline for KafkaTimer: Add the new multi-timer code to the object level Since the most common (only?) use-case is updating a specific and a global timer, have an additional Option [Timer] for the global timer in the constructor that defaults to None Currying. e.g., def timeWith [A] (timers: Timer*)(f: => A) = { val ctxSeq = timers.map(_.time()) try { f } finally { ctxSeq.foreach(_.stop()) } } and usage would be KafkaTimer.timeWith(specificTimer, globalTimer) { // block of code to time } Clearly I'm obsessive compulsive but I think it would be good to avoid breaking the clean user-side syntax that the existing API allows.
        Hide
        Joel Koshy added a comment -

        Looks good overall. Some initial comments:

        KafkaTimer:
        I see why you needed to add the Seq-based time method: to update a specific and
        a global timer at one shot. However, we should still keep the previous (class)
        method because: the new method is slightly clunky to use (in having to wrap
        everything around the function call parentheses). The main goal of that method
        was convenience - or the user could directly deal with the timer and its
        context. So can you restore that code and just use the new object method where
        you need it?

        With the above, the following can be reverted:
        SimpleConsumer
        KafkaController
        ReplicaStateMachine
        SyncProducer
        KafkaTimerTest (although we should additionally exercise your new object
        method).

        Log.scala:

        • Can you break line 488?
        Show
        Joel Koshy added a comment - Looks good overall. Some initial comments: KafkaTimer: I see why you needed to add the Seq-based time method: to update a specific and a global timer at one shot. However, we should still keep the previous (class) method because: the new method is slightly clunky to use (in having to wrap everything around the function call parentheses). The main goal of that method was convenience - or the user could directly deal with the timer and its context. So can you restore that code and just use the new object method where you need it? With the above, the following can be reverted: SimpleConsumer KafkaController ReplicaStateMachine SyncProducer KafkaTimerTest (although we should additionally exercise your new object method). Log.scala: Can you break line 488?
        Yang Ye made changes -
        Field Original Value New Value
        Attachment kafka_604_v1.patch [ 12552740 ]
        Hide
        Yang Ye added a comment -

        1. Creating a KafkaTimer.time static function which takes a functor and (variable length) timers as input to support multiple timer.

        2. add per-partition log flush timer.

        3. make the log flush the time of flushing both the index and data log

        Show
        Yang Ye added a comment - 1. Creating a KafkaTimer.time static function which takes a functor and (variable length) timers as input to support multiple timer. 2. add per-partition log flush timer. 3. make the log flush the time of flushing both the index and data log
        Jun Rao created issue -

          People

          • Assignee:
            Swapnil Ghike
            Reporter:
            Jun Rao
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Time Tracking

              Estimated:
              Original Estimate - 24h
              24h
              Remaining:
              Remaining Estimate - 24h
              24h
              Logged:
              Time Spent - Not Specified
              Not Specified

                Development