Kafka
  1. Kafka
  2. KAFKA-550

Wildcarded consumption is single-threaded

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      It's surprising that we haven't noticed this before, but I was looking at a CPU usage profile on yourkit and It turns out that only one mirror maker thread is actually doing anything. Basically I suspect some bug in fetcher -> queue mapping. Only one queue seems to have any data. I'll look into this probably next week.

      1. KAFKA-550-v1.patch
        2 kB
        Joel Koshy
      2. KAFKA-550-v2.patch
        3 kB
        Joel Koshy

        Activity

        Hide
        Jun Rao added a comment -

        Could you open a separate jira for improving partition assignment for wildcard topic? This is probably beyond 0.8 though.

        Show
        Jun Rao added a comment - Could you open a separate jira for improving partition assignment for wildcard topic? This is probably beyond 0.8 though.
        Hide
        Joel Koshy added a comment -

        Committed to 0.8 and trunk.

        Show
        Joel Koshy added a comment - Committed to 0.8 and trunk.
        Hide
        Neha Narkhede added a comment -

        Good catch, +1

        Show
        Neha Narkhede added a comment - Good catch, +1
        Hide
        Joel Koshy added a comment -

        Another point to consider for future enhancement is that right now all wildcard topics will be assigned
        num-threads (i.e., the wildcard stream count) threads regardless of how many partitions
        they have.

        Ideally, the user should be able to say something like "numThreads=100" and messages from all
        topic-partitions (that are allowed by the wildcard) should be fed into those queues/streams uniformly.

        Show
        Joel Koshy added a comment - Another point to consider for future enhancement is that right now all wildcard topics will be assigned num-threads (i.e., the wildcard stream count) threads regardless of how many partitions they have. Ideally, the user should be able to say something like "numThreads=100" and messages from all topic-partitions (that are allowed by the wildcard) should be fed into those queues/streams uniformly.
        Hide
        Joel Koshy added a comment -

        The bug was not in the implicit function. I removed the implicit because it
        could be easily avoided in this case.
        The issue was here: (tt <- topicThreadIds; qs <- queuesAndStreams) yield (tt
        -> qs)

        If there were topic-threadIds t1, t2, and queues q1, q2, you would get (t1
        -> q1) (t1, q2) (t2 -> q1) (t2 -> q2) and they were sequentially added to
        the topicThreadIdAndQueues pool. So all the threads would feed into the same
        queue.

        Re: the computation is different because topicThreadIds in wildcard
        consumption may share the same queue so you can't zip them directly. e.g.,
        whether there are two or 100 allowed topics, they will all share the same
        num-streams queues, and num-streams != the number of threads, so you can't
        zip them. In the static topic count case, there is one-to-one correspondence
        so you can zip them.

        That said, I think the code can be made clearer here. I will upload another
        patch in a minute.

        Show
        Joel Koshy added a comment - The bug was not in the implicit function. I removed the implicit because it could be easily avoided in this case. The issue was here: (tt <- topicThreadIds; qs <- queuesAndStreams) yield (tt -> qs) If there were topic-threadIds t1, t2, and queues q1, q2, you would get (t1 -> q1) (t1, q2) (t2 -> q1) (t2 -> q2) and they were sequentially added to the topicThreadIdAndQueues pool. So all the threads would feed into the same queue. Re: the computation is different because topicThreadIds in wildcard consumption may share the same queue so you can't zip them directly. e.g., whether there are two or 100 allowed topics, they will all share the same num-streams queues, and num-streams != the number of threads, so you can't zip them. In the static topic count case, there is one-to-one correspondence so you can zip them. That said, I think the code can be made clearer here. I will upload another patch in a minute.
        Hide
        Jun Rao added a comment -

        Thanks for the patch. What was the bug? Is it the implicit function?

        1.reinitializeConsumer(): It seems that the computation for threadQueueStreamPairs is slightly different for WildcardTopicCount and StaticTopicCount. However, it seems that we can just use the same computation.

        Show
        Jun Rao added a comment - Thanks for the patch. What was the bug? Is it the implicit function? 1.reinitializeConsumer(): It seems that the computation for threadQueueStreamPairs is slightly different for WildcardTopicCount and StaticTopicCount. However, it seems that we can just use the same computation.
        Hide
        Joel Koshy added a comment -

        This was a silly coding bug - the topicThreadIdAndQueues map was getting
        updated incorrectly. This affects both 0.7 and 0.8. The same patch applies to
        both trunk and 0.8.

        The patch fixes the issue. Also got rid of the implicit def in
        reinitializeConsumer.

        Show
        Joel Koshy added a comment - This was a silly coding bug - the topicThreadIdAndQueues map was getting updated incorrectly. This affects both 0.7 and 0.8. The same patch applies to both trunk and 0.8. The patch fixes the issue. Also got rid of the implicit def in reinitializeConsumer.

          People

          • Assignee:
            Joel Koshy
            Reporter:
            Joel Koshy
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development