Details

    • Type: Sub-task Sub-task
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.8.0
    • Component/s: core
    • Labels:
      None

      Description

      We can assume the leader of a partition is always the first replica. Data will only be stored in the first replica. So, there is no fault-tolerance support yet. Just make the partition logical.

      1. kafka-239-latest-revision-unit-tests-passing.patch
        200 kB
        Neha Narkhede
      2. kafka-239-v2.patch
        203 kB
        Neha Narkhede
      3. kafka-239-v3.patch
        206 kB
        Neha Narkhede
      4. kafka-239-v4.patch
        206 kB
        Neha Narkhede

        Issue Links

          Activity

          Hide
          Neha Narkhede added a comment -

          Quite a bit of changes -

          1. The producer depends on zookeeper to fetch the initial list of brokers in the cluster. After that, it uses the getTopicMetadata API to fetch the topic metadata
          2. The partitions are made logical. Al though, it is a bit confusing what the Partition object represents. I think a partition should be associated with a topic, broker and have an id
          3. Removed the ZkLoadbalanceTest and javaapi.PrimitiveApiTest. These were duplicated tests

          Show
          Neha Narkhede added a comment - Quite a bit of changes - 1. The producer depends on zookeeper to fetch the initial list of brokers in the cluster. After that, it uses the getTopicMetadata API to fetch the topic metadata 2. The partitions are made logical. Al though, it is a bit confusing what the Partition object represents. I think a partition should be associated with a topic, broker and have an id 3. Removed the ZkLoadbalanceTest and javaapi.PrimitiveApiTest. These were duplicated tests
          Hide
          Neha Narkhede added a comment -

          This patch needed some cleanup -

          1. the zk connection was created more than once in the producer. This can be done once, by moving it to the constructor of the producer pool.
          2. Imports need cleanup as well

          Show
          Neha Narkhede added a comment - This patch needed some cleanup - 1. the zk connection was created more than once in the producer. This can be done once, by moving it to the constructor of the producer pool. 2. Imports need cleanup as well
          Hide
          Neha Narkhede added a comment -

          1. Imports are cleaned up
          2. zkClient instance creation is moved to the Producer

          Show
          Neha Narkhede added a comment - 1. Imports are cleaned up 2. zkClient instance creation is moved to the Producer
          Hide
          Jun Rao added a comment -

          1. ZkUtils.getTopicPartitionsPath: There are a couple reasons why we chose to use /brokers/topics/partitions to store partition data, instead of /brokers/topics: (1) This distinguishes the ZK layout in 0.8 from 0.7. In 0.7, we already store broker ids directly under /brokers/topics. So there could be confusion if we put partition ids at the same level. (2) This also leaves room for future extension to add non-partition level per topic info in ZK.

          2. ZkUtils: rename idoesBrokerHostPartition to something like isPartitionOnBroker

          3. LogManager: logFlusherScheduler.startUp is called twice.

          4. LogManager.getOrCreateLog: The LogManager shouldn't need to know anything about ZK. The check of whether a partition exists on a broker should be done at KafkaApi level. Ideally, we just want to check partition ownership from a local cache, which will be populated by ZK listeners (part of kafka-44). In this patch, we can either not check at all or directly check from ZK (with the intention to have it optimized in kafka-44).

          5. KafkaServer: the log cleaner scheduler is scheduled in LogManager, should we start up the schedule there too. If there is good reason to do that, should we protect startUp from being called more than once?

          6. KafkaScheduler: what's the benefit of having a startUp method? It seems creating the executor in the constructor is simpler.

          7. Partition should be made logical. It only contains topic and partition id. There will be a new entity Replica which is associated with broker id.

          8. ProducerPool: remove unused import and fix indentation in close()

          9. AsyncProducerTest: There are duplicated code that sets up mock to form expected partition metadata. Can we have a separate method to share the code?

          10. Producer: Is zkClient in the constructor just for testing? If so, add a comment to indicate that.

          11. TestUtils: why do we need checkSetEqual? Doesn't scala do that by default?

          12. ZookeeperConsumerConnectorTest.testLederSelectionForPartition: is it really testing leader election? It seems to be testing partition ownership.

          13. ZkUtils.getLeaderForPartition: This is probably fine for this patch. However, we should think whether it is better to get leader info from ZK directly or use the getMetaData api.

          Show
          Jun Rao added a comment - 1. ZkUtils.getTopicPartitionsPath: There are a couple reasons why we chose to use /brokers/topics/partitions to store partition data, instead of /brokers/topics: (1) This distinguishes the ZK layout in 0.8 from 0.7. In 0.7, we already store broker ids directly under /brokers/topics. So there could be confusion if we put partition ids at the same level. (2) This also leaves room for future extension to add non-partition level per topic info in ZK. 2. ZkUtils: rename idoesBrokerHostPartition to something like isPartitionOnBroker 3. LogManager: logFlusherScheduler.startUp is called twice. 4. LogManager.getOrCreateLog: The LogManager shouldn't need to know anything about ZK. The check of whether a partition exists on a broker should be done at KafkaApi level. Ideally, we just want to check partition ownership from a local cache, which will be populated by ZK listeners (part of kafka-44). In this patch, we can either not check at all or directly check from ZK (with the intention to have it optimized in kafka-44). 5. KafkaServer: the log cleaner scheduler is scheduled in LogManager, should we start up the schedule there too. If there is good reason to do that, should we protect startUp from being called more than once? 6. KafkaScheduler: what's the benefit of having a startUp method? It seems creating the executor in the constructor is simpler. 7. Partition should be made logical. It only contains topic and partition id. There will be a new entity Replica which is associated with broker id. 8. ProducerPool: remove unused import and fix indentation in close() 9. AsyncProducerTest: There are duplicated code that sets up mock to form expected partition metadata. Can we have a separate method to share the code? 10. Producer: Is zkClient in the constructor just for testing? If so, add a comment to indicate that. 11. TestUtils: why do we need checkSetEqual? Doesn't scala do that by default? 12. ZookeeperConsumerConnectorTest.testLederSelectionForPartition: is it really testing leader election? It seems to be testing partition ownership. 13. ZkUtils.getLeaderForPartition: This is probably fine for this patch. However, we should think whether it is better to get leader info from ZK directly or use the getMetaData api.
          Hide
          Neha Narkhede added a comment -

          1, 3. Done
          4. Better done as part of kafka-44. If this is not done at all in this patch, unit tests break left and right. This patch had to change almost all tests, I'll prefer making more test changes in another patch.
          5. Good suggestion.
          6. These changes were required since the current code doesn't allow you to restart a Kafka server by calling shutdown and start on KafkaServer. It might need some cleanup, but it seems wrong that we can't restart a server that has been shutdown. Ideally, we need to refactor the LogManager, KafkaZookeeper and KafkaServer to remove the interdependencies.
          7. broker id is left there for now. This can change once that new Replica entity is introduced. Changing it in this patch is again introducing way too many changes. I'm thinking all cleanup doesn't have to happen in one huge patch. I will prefer this change is done in another one
          8, 9, 10. Done.
          12. It is meant for checking topic registry to test the leader selection. The partition ownership checks are pulled in from ZkLoadBalanceTest

          Lets keep in mind, that this is not the only patch that we should do for clean up and refactoring. It is getting harder to manage and rebase too.

          Show
          Neha Narkhede added a comment - 1, 3. Done 4. Better done as part of kafka-44. If this is not done at all in this patch, unit tests break left and right. This patch had to change almost all tests, I'll prefer making more test changes in another patch. 5. Good suggestion. 6. These changes were required since the current code doesn't allow you to restart a Kafka server by calling shutdown and start on KafkaServer. It might need some cleanup, but it seems wrong that we can't restart a server that has been shutdown. Ideally, we need to refactor the LogManager, KafkaZookeeper and KafkaServer to remove the interdependencies. 7. broker id is left there for now. This can change once that new Replica entity is introduced. Changing it in this patch is again introducing way too many changes. I'm thinking all cleanup doesn't have to happen in one huge patch. I will prefer this change is done in another one 8, 9, 10. Done. 12. It is meant for checking topic registry to test the leader selection. The partition ownership checks are pulled in from ZkLoadBalanceTest Lets keep in mind, that this is not the only patch that we should do for clean up and refactoring. It is getting harder to manage and rebase too.
          Hide
          Neha Narkhede added a comment -

          There is something wierd happening with the AutoOffsetResetTest with the latest patch, will take a look and upload another one

          Show
          Neha Narkhede added a comment - There is something wierd happening with the AutoOffsetResetTest with the latest patch, will take a look and upload another one
          Hide
          Jun Rao added a comment -

          Please address items 2 and 11. For 4, I am fine leaving the ZK check in LogManager for now. Please add a comment that this will be fixed later.

          Unit tests seem to hang consistently for me. Seems to hang on testLatestOffsetResetForward(kafka.integration.AutoOffsetResetTest).

          Show
          Jun Rao added a comment - Please address items 2 and 11. For 4, I am fine leaving the ZK check in LogManager for now. Please add a comment that this will be fixed later. Unit tests seem to hang consistently for me. Seems to hang on testLatestOffsetResetForward(kafka.integration.AutoOffsetResetTest).
          Hide
          Neha Narkhede added a comment -

          >> 11. TestUtils: why do we need checkSetEqual? Doesn't scala do that by default?

          This is something you had added in the ZookeeperConsumerConnectorTest, not part of this patch. I think Scala does it by default, and I'll clean it up as well.

          >> 2. ZkUtils: rename idoesBrokerHostPartition to something like isPartitionOnBroker

          I didn't see any grammatical difference between the two, but if you think the latter is easier to understand, I'll incorporate the change

          >> Unit tests seem to hang consistently for me. Seems to hang on testLatestOffsetResetForward(kafka.integration.AutoOffsetResetTest).

          Please see my previous update on this JIRA, I've already identified that. Please wait for another patch

          Show
          Neha Narkhede added a comment - >> 11. TestUtils: why do we need checkSetEqual? Doesn't scala do that by default? This is something you had added in the ZookeeperConsumerConnectorTest, not part of this patch. I think Scala does it by default, and I'll clean it up as well. >> 2. ZkUtils: rename idoesBrokerHostPartition to something like isPartitionOnBroker I didn't see any grammatical difference between the two, but if you think the latter is easier to understand, I'll incorporate the change >> Unit tests seem to hang consistently for me. Seems to hang on testLatestOffsetResetForward(kafka.integration.AutoOffsetResetTest). Please see my previous update on this JIRA, I've already identified that. Please wait for another patch
          Hide
          Neha Narkhede added a comment -

          2. Renamed ZkUtils.isPartitionOnBroker
          11. Removed checkSetEquals

          The AutoOffsetResetTest hangs since it was updating the offset for the wrong ZK path. Fixed that.

          Show
          Neha Narkhede added a comment - 2. Renamed ZkUtils.isPartitionOnBroker 11. Removed checkSetEquals The AutoOffsetResetTest hangs since it was updating the offset for the wrong ZK path. Fixed that.
          Hide
          Jun Rao added a comment -

          +1 on the patch.

          For 2, I was interpreting Host as a noun, instead of a verb. Thus the confusion.

          Show
          Jun Rao added a comment - +1 on the patch. For 2, I was interpreting Host as a noun, instead of a verb. Thus the confusion.
          Hide
          Neha Narkhede added a comment -

          ah, speaking of different perspectives, which is a good thing

          Show
          Neha Narkhede added a comment - ah, speaking of different perspectives, which is a good thing

            People

            • Assignee:
              Neha Narkhede
              Reporter:
              Jun Rao
            • Votes:
              0 Vote for this issue
              Watchers:
              0 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development