Kafka
  1. Kafka
  2. KAFKA-101

Avoid creating a new topic by the consumer

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.7
    • Fix Version/s: 0.7
    • Component/s: core
    • Labels:

      Description

      Currently, if a consumer consumes a topic and the topic doesn't exist, the topic is created automatically. Sometimes this can be confusing. Often, an ad hoc user may put a wrong topic name in the consumer and thus create many unnecessary topics.

      1. KAFKA-101_v3.patch
        10 kB
        Jun Rao
      2. KAFKA-101-getoffsets.patch
        6 kB
        Taylor Gautier
      3. KAFKA-101_v2.patch
        6 kB
        Jun Rao
      4. 0002-KAFKA-101-Avoid-creating-a-new-topic-by-the-consumer.patch.0.7
        6 kB
        Taylor Gautier

        Activity

        Hide
        Taylor Gautier added a comment -

        Fixes KAFKA-101 but the core tests for ZK fail - I think because other tests are expecting a getOrCreate call to connect to ZK and it no longer does that.

        Show
        Taylor Gautier added a comment - Fixes KAFKA-101 but the core tests for ZK fail - I think because other tests are expecting a getOrCreate call to connect to ZK and it no longer does that.
        Hide
        Jun Rao added a comment -

        Thanks for the patch.

        The test failures seem to be transient and are the results of time dependencies, which we should fix separately.

        Could you add a unit test on SimpleConsumer to consume a non-existing topic/partition? It should return an empty messageSet. Other than that, the patch looks good.

        Show
        Jun Rao added a comment - Thanks for the patch. The test failures seem to be transient and are the results of time dependencies, which we should fix separately. Could you add a unit test on SimpleConsumer to consume a non-existing topic/partition? It should return an empty messageSet. Other than that, the patch looks good.
        Hide
        Taylor Gautier added a comment -

        I added a test for the empty log in LogManager. You want a similar test in SimpleConsumer also?

        Show
        Taylor Gautier added a comment - I added a test for the empty log in LogManager. You want a similar test in SimpleConsumer also?
        Hide
        Jun Rao added a comment -

        Yes, since a test in SimpleConsumer will further test the wire protocol. Thanks,

        Show
        Jun Rao added a comment - Yes, since a test in SimpleConsumer will further test the wire protocol. Thanks,
        Hide
        Taylor Gautier added a comment -

        sorry, I am not seeing a SimpleConsumer test. Do you have a specific file in mind? Maybe log/LogOffsetTest.scala?

        Show
        Taylor Gautier added a comment - sorry, I am not seeing a SimpleConsumer test. Do you have a specific file in mind? Maybe log/LogOffsetTest.scala?
        Hide
        Jun Rao added a comment -

        We can add it in PrimitiveApiTest.

        Show
        Jun Rao added a comment - We can add it in PrimitiveApiTest.
        Hide
        Jun Rao added a comment -

        Actually another thing. If a topic doesn't already exist, the consumer shouldn't create the ZK path /broker/topics/topic. Currently, we do the following in ZookeeperConsumerConnector.consume

        ZkUtils.makeSurePersistentPathExists(zkClient, partitionPath)

        We should remove that line. There are a couple of things to check.
        (1) If the consumer can still get the watcher triggered when the topic is created (by a producer). I think this should happen since ZKClient registers an Exists watcher if path doesn't exist. But we should check.
        (2) There are probably places in the rebalance code where we should treat missing the ZK path /broker/topics/topic properly.

        Show
        Jun Rao added a comment - Actually another thing. If a topic doesn't already exist, the consumer shouldn't create the ZK path /broker/topics/topic. Currently, we do the following in ZookeeperConsumerConnector.consume ZkUtils.makeSurePersistentPathExists(zkClient, partitionPath) We should remove that line. There are a couple of things to check. (1) If the consumer can still get the watcher triggered when the topic is created (by a producer). I think this should happen since ZKClient registers an Exists watcher if path doesn't exist. But we should check. (2) There are probably places in the rebalance code where we should treat missing the ZK path /broker/topics/topic properly.
        Hide
        Jun Rao added a comment -

        Attache patch v2. Took Taylor's patch and added (1) a SimpleConsumer level test in PrimitiveApiTest (2) don't let consumer create the topic path under /brokers/topics (only producer can create it).

        Show
        Jun Rao added a comment - Attache patch v2. Took Taylor's patch and added (1) a SimpleConsumer level test in PrimitiveApiTest (2) don't let consumer create the topic path under /brokers/topics (only producer can create it).
        Hide
        Jun Rao added a comment -

        Taylor, could you review patch v2? Thanks,

        Show
        Jun Rao added a comment - Taylor, could you review patch v2? Thanks,
        Hide
        Taylor Gautier added a comment -

        I realized on Friday that I had neglected to fix getOffsets. So calling getOffsets on a non-existent topic creates a file too. I wrote the fix yesterday - I will see if I can merge the two together and post an updated patch with tests.

        Show
        Taylor Gautier added a comment - I realized on Friday that I had neglected to fix getOffsets. So calling getOffsets on a non-existent topic creates a file too. I wrote the fix yesterday - I will see if I can merge the two together and post an updated patch with tests.
        Hide
        Taylor Gautier added a comment -

        Ok - your commit looks good. I've added the get offsets patch which should be applied after yours.

        Show
        Taylor Gautier added a comment - Ok - your commit looks good. I've added the get offsets patch which should be applied after yours.
        Hide
        Jun Rao added a comment -

        Merged the v2 patch and the getoffset patch into a v3 patch.
        1. get rid of unused method LogManager.getLogOrGenerateFake
        2. If the topic hasn't been created, probably only getting the latest or the earliest offset make sense. Otherwise, we should return an empty array.
        3. Optimized the import on several files.

        Show
        Jun Rao added a comment - Merged the v2 patch and the getoffset patch into a v3 patch. 1. get rid of unused method LogManager.getLogOrGenerateFake 2. If the topic hasn't been created, probably only getting the latest or the earliest offset make sense. Otherwise, we should return an empty array. 3. Optimized the import on several files.
        Hide
        Taylor Gautier added a comment -

        lgtm - I tried to make Log.getLog(...) more scala like and have it return Option[Log] but that somehow caused other tests to fail, my scala fu is not yet strong.

        Show
        Taylor Gautier added a comment - lgtm - I tried to make Log.getLog(...) more scala like and have it return Option [Log] but that somehow caused other tests to fail, my scala fu is not yet strong.
        Hide
        Jun Rao added a comment -

        Thanks, Taylor. Just committed this.

        Show
        Jun Rao added a comment - Thanks, Taylor. Just committed this.
        Hide
        Taylor Gautier added a comment -

        great!

        Show
        Taylor Gautier added a comment - great!
        Hide
        Taylor Gautier added a comment -

        So - I rolled out my version of this patch which was applied to 0.6. In dev and stage, it worked just great. In production, we it appears we ended up with a very mysterious occurrence.

        Topics appeared to be created and the first message delivered was corrupted. Instead of having the usual binary data at the start of a topic file, the beginning of the file held the partial contents of the first message.

        I'm going to start looking into how that might even be possible - I thought to post here in case in other stages of development maybe you saw this problem and/or fixed it which might help me pinpoint which part of the code could be responsible that I mistakenly broken.

        To be clear, if the first message delivered was let's say "One fine day in the middle of the night" then the contents of our topic files appeared as so:

        -----------------------------------
        ay in the middle of the night
        ------------------------------------

        When it should have been something like:
        ---------------------------------
        [binary stuff]One fine day in the middle of the night
        ------------------------------------

        Show
        Taylor Gautier added a comment - So - I rolled out my version of this patch which was applied to 0.6. In dev and stage, it worked just great. In production, we it appears we ended up with a very mysterious occurrence. Topics appeared to be created and the first message delivered was corrupted. Instead of having the usual binary data at the start of a topic file, the beginning of the file held the partial contents of the first message. I'm going to start looking into how that might even be possible - I thought to post here in case in other stages of development maybe you saw this problem and/or fixed it which might help me pinpoint which part of the code could be responsible that I mistakenly broken. To be clear, if the first message delivered was let's say "One fine day in the middle of the night" then the contents of our topic files appeared as so: ----------------------------------- ay in the middle of the night ------------------------------------ When it should have been something like: --------------------------------- [binary stuff] One fine day in the middle of the night ------------------------------------
        Hide
        Jun Rao added a comment -

        Taylor, this would suggest a synchronization issue during append to the log. Looking at the code, the append is always synchronized on the lock of the segment file. Could you reproduce the problem on 0.7?

        Show
        Jun Rao added a comment - Taylor, this would suggest a synchronization issue during append to the log. Looking at the code, the append is always synchronized on the lock of the segment file. Could you reproduce the problem on 0.7?
        Hide
        Taylor Gautier added a comment -

        Unfortunately, I cannot. I have gone ahead and updated our clients to speak 0.7 properly so we are going to move soon. However it's going to be hard to justify to roll out just 0.7 with this patch without any changes "just to see" if the problem resurfaces. We might go ahead and do it on one machine. Anyway that will have to wait until at best sometime next week since we will have to rollout updated clients that can speak 0.6 and 0.7 first.

        Show
        Taylor Gautier added a comment - Unfortunately, I cannot. I have gone ahead and updated our clients to speak 0.7 properly so we are going to move soon. However it's going to be hard to justify to roll out just 0.7 with this patch without any changes "just to see" if the problem resurfaces. We might go ahead and do it on one machine. Anyway that will have to wait until at best sometime next week since we will have to rollout updated clients that can speak 0.6 and 0.7 first.

          People

          • Assignee:
            Unassigned
            Reporter:
            Jun Rao
          • Votes:
            1 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development