Details

    • Type: Sub-task Sub-task
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: core
    • Labels:
      None

      Description

      A broker could be following multiple topic/partitions from the broker. Instead of using 1 fetcher thread per topic/partition, it would be more efficient to use 1 fetcher thread that issues multi-fetch requests.

      1. kafka-339_v2.patch
        27 kB
        Jun Rao
      2. kafka-339_v1.patch
        28 kB
        Jun Rao

        Activity

        Jun Rao created issue -
        Jun Rao made changes -
        Field Original Value New Value
        Fix Version/s 0.8 [ 12317244 ]
        Jun Rao made changes -
        Assignee Jun Rao [ junrao ]
        Jun Rao made changes -
        Attachment kafka-339_v1.patch [ 12532277 ]
        Hide
        Jun Rao added a comment -

        Uploaded patch v1.

        Created an AbstractFetcher and AbstractFetcherManager, which contain the common code path for fetchers used in followers and real consumer clients. Added ReplicaFetcher and ReplicaFetcherManager (use multi-fetch) to replace ReplicaFetcherThread.

        Will reimplement the fetcher in consumer client based on AbstractFetcher and AbstractFetcherManager in a separate jira, kafka-362.

        Show
        Jun Rao added a comment - Uploaded patch v1. Created an AbstractFetcher and AbstractFetcherManager, which contain the common code path for fetchers used in followers and real consumer clients. Added ReplicaFetcher and ReplicaFetcherManager (use multi-fetch) to replace ReplicaFetcherThread. Will reimplement the fetcher in consumer client based on AbstractFetcher and AbstractFetcherManager in a separate jira, kafka-362.
        Jun Rao made changes -
        Status Open [ 1 ] Patch Available [ 10002 ]
        Hide
        Joel Koshy added a comment -

        Thanks for the patch. Some comments:

        AbstractFetcher:

        • currentOffset should never be empty, so we can get rid of the if.
        • hasPartition, partitionCount need to synchronize fetchMap
        • "shutting down" should probably be removed from the string on line 106
        • newOffset can be computed from the messageSet - so the processPartitionData implementation does not need to return the log end offset (and likewise when we use this in the high-level consumer). It's probably safer to prevent processPartitionData from overriding the new offset, and I don't see any benefit in allowing it to do so.

        AbstractFetcherManager:

        • addFetcher:
        • can rename to maybeAddFetcher
        • also, maybe we should move the info log on line 38 to the None case and print the fetcher id; and add another log for the other case saying there's already a fetcher.
        • What is the purpose of having a fetcher ID vs simply topic-partition?
        • Should synchronize fetcherRunnableMap in shutdown with mapLock

        ReplicaManager:

        • Maybe some corner case that I'm missing, but makeFollower already passes in the new leaderBrokerId so why do we need to re-read from ZooKeeper (line 173)?

        ReplicaFetchTest:

        • Ideally producer.close() should be before the waitUntilTrue
        • The condition function uses & instead of &&.
        • Also, instead of the hard-coded 60L I think it would be clearer and sufficient to do something like:
          val expectedOffset = brokers.head.getLogManager...logEndOffset
          assertEquals(brokers.size, brokers.count( broker => broker.getLogManager...logEndOffset == expectedOffset ))
        Show
        Joel Koshy added a comment - Thanks for the patch. Some comments: AbstractFetcher: currentOffset should never be empty, so we can get rid of the if. hasPartition, partitionCount need to synchronize fetchMap "shutting down" should probably be removed from the string on line 106 newOffset can be computed from the messageSet - so the processPartitionData implementation does not need to return the log end offset (and likewise when we use this in the high-level consumer). It's probably safer to prevent processPartitionData from overriding the new offset, and I don't see any benefit in allowing it to do so. AbstractFetcherManager: addFetcher: can rename to maybeAddFetcher also, maybe we should move the info log on line 38 to the None case and print the fetcher id; and add another log for the other case saying there's already a fetcher. What is the purpose of having a fetcher ID vs simply topic-partition? Should synchronize fetcherRunnableMap in shutdown with mapLock ReplicaManager: Maybe some corner case that I'm missing, but makeFollower already passes in the new leaderBrokerId so why do we need to re-read from ZooKeeper (line 173)? ReplicaFetchTest: Ideally producer.close() should be before the waitUntilTrue The condition function uses & instead of &&. Also, instead of the hard-coded 60L I think it would be clearer and sufficient to do something like: val expectedOffset = brokers.head.getLogManager...logEndOffset assertEquals(brokers.size, brokers.count( broker => broker.getLogManager...logEndOffset == expectedOffset ))
        Hide
        Jun Rao added a comment -

        Thanks for the review. Attaching patch v2.

        AbstractFetcher:

        • currentOffset actually can be none. A fetcher can be removed after the multi-fetch request is made.

        AbstractFetcherManager:
        – addFetcher actually always adds a fetcher, but not always creates a new fetcher thread. I see the naming is a bit confusing. Renamed AbstractFetcher to AbstractFetcherThread.
        – Fetchermanager is maintaining 1 or more fetcherThreads per source broker. Be default, there is 1 fetcherThread per broker. However, for higher degree of parallelism, more fetcherThreads can be configured. A fetcher corresponds to the fetching from 1 partition of a topic. Multiple fetchers can be added to a fetcherThread.

        ReplicaManager:
        – We need to get the host/port from ZK for a given broker id. Such information should be cached. Will create a separate jira to address this issue.

        The rest of of comments have been fixed.

        Show
        Jun Rao added a comment - Thanks for the review. Attaching patch v2. AbstractFetcher: currentOffset actually can be none. A fetcher can be removed after the multi-fetch request is made. AbstractFetcherManager: – addFetcher actually always adds a fetcher, but not always creates a new fetcher thread. I see the naming is a bit confusing. Renamed AbstractFetcher to AbstractFetcherThread. – Fetchermanager is maintaining 1 or more fetcherThreads per source broker. Be default, there is 1 fetcherThread per broker. However, for higher degree of parallelism, more fetcherThreads can be configured. A fetcher corresponds to the fetching from 1 partition of a topic. Multiple fetchers can be added to a fetcherThread. ReplicaManager: – We need to get the host/port from ZK for a given broker id. Such information should be cached. Will create a separate jira to address this issue. The rest of of comments have been fixed.
        Jun Rao made changes -
        Attachment kafka-339_v2.patch [ 12532630 ]
        Hide
        Joel Koshy added a comment -

        +1 on v2

        Minor comments:

        • AbstractFetcherManager: may be useful to log the fetcher-id for the topic/partition when adding the fetcher
        • ReplicaFetcherManager: miss-match -> mismatch
        • ReplicaManager: makeFollower: I still don't think we need to look up zk for the leader as the value passed in should be current.
        Show
        Joel Koshy added a comment - +1 on v2 Minor comments: AbstractFetcherManager: may be useful to log the fetcher-id for the topic/partition when adding the fetcher ReplicaFetcherManager: miss-match -> mismatch ReplicaManager: makeFollower: I still don't think we need to look up zk for the leader as the value passed in should be current.
        Hide
        Neha Narkhede added a comment -

        +1 on v2, assuming Joel's comments are addressed.

        Show
        Neha Narkhede added a comment - +1 on v2, assuming Joel's comments are addressed.
        Hide
        Jun Rao added a comment -

        Thanks for the review. Committed patch v2 with a minor improvement by testing multiple topics in ReplicaFetcherTest.

        As for unnecessary ZK reads in ReplicaManager, this should be fixed as part of kafka-343 when moving the leader election logic to the controller.

        Show
        Jun Rao added a comment - Thanks for the review. Committed patch v2 with a minor improvement by testing multiple topics in ReplicaFetcherTest. As for unnecessary ZK reads in ReplicaManager, this should be fixed as part of kafka-343 when moving the leader election logic to the controller.
        Jun Rao made changes -
        Status Patch Available [ 10002 ] Resolved [ 5 ]
        Resolution Fixed [ 1 ]

          People

          • Assignee:
            Jun Rao
            Reporter:
            Jun Rao
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Time Tracking

              Estimated:
              Original Estimate - 252h
              252h
              Remaining:
              Remaining Estimate - 252h
              252h
              Logged:
              Time Spent - Not Specified
              Not Specified

                Development