Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-226

Auto-create changelog streams for kv

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.9.0
    • 0.9.0
    • container, kv
    • None

    Description

      Currently, changelog topics are not auto-created. This is a frustrating user experience, and there are a few useful defaults that should be set that are not obvious when creating Kafka topics with log compaction enabled.

      We should have Samza auto-create changelog streams for the kv stores that have changelogs enabled.

      In Kafka's case, the changelog topics should be created with compaction enabled. They should also be created with a smaller (100mb) default segment.bytes setting. The smaller segment.bytes setting is useful for low-volume changelogs. The problem we've seen in the past is that the default log.segment.bytes is 1 gig. Kafka's compaction implementation NEVER touches the most recent log segment. This means that, if you have a very small state store, but execute a lot of deletes/updates (e.g. you've only got maybe 25 megs of active state, but are deleting and updating it frequently), you will always end up with at LEAST 1 gig of state to restore (since the most recent segment will always contain non-compacted writes). This is silly since your active (compacted) state is really only ~25 megs. Shrinking the segment bytes means that you'll have a smaller maximum data size to restore. The trade off here is that we'll have more segment files for changelogs, which will increase file handles.

      The trick is doing this in a generic way, since we are supporting changelogs for more than just Kafka systems. I think the interface to do the stream creation belongs in the SystemAdmin interface. It would be nice to have a generic SystemAdmin.createStream() interface, but this would require giving it kafka-specific configuration. Another option is to have SystemAdmin.createChangelogStream, but this seems a bit hacky at first glance. We need to think this part through.

      martinkl, in hello-samza, how are we creating log compacted state stores with the appropriate number of partitions? Is this handled as part of bin/grid?

      Attachments

        1. rb28016.patch
          31 kB
          Naveen Somasundaram
        2. rb28016 (1).patch
          33 kB
          Naveen Somasundaram
        3. rb28016 (2).patch
          33 kB
          Naveen Somasundaram
        4. rb29012.patch
          40 kB
          Naveen Somasundaram

        Issue Links

          Activity

            AFAIK we're not creating any Kafka topics explicitly in hello-samza. Doing so would actually be a bit tricky, because of an issue mentioned in this comment on SAMZA-152: ZK and Kafka broker need to be up before topics can be created, but a script can't tell whether that is the case. See also this email.

            I think it would be good to add stream creation to the SystemAdmin interface. I prefer SystemAdmin.createChangelogStream over leaking Kafka-specific configuration into samza-core. The state changelog is a fundamental concept in Samza (whereas Kafka is supposed to be completely pluggable), so I don't think it's a problem to have a method like SystemAdmin.createChangelogStream in the Samza API.

            martinkl Martin Kleppmann added a comment - AFAIK we're not creating any Kafka topics explicitly in hello-samza. Doing so would actually be a bit tricky, because of an issue mentioned in this comment on SAMZA-152 : ZK and Kafka broker need to be up before topics can be created, but a script can't tell whether that is the case. See also this email . I think it would be good to add stream creation to the SystemAdmin interface. I prefer SystemAdmin.createChangelogStream over leaking Kafka-specific configuration into samza-core. The state changelog is a fundamental concept in Samza (whereas Kafka is supposed to be completely pluggable), so I don't think it's a problem to have a method like SystemAdmin.createChangelogStream in the Samza API.

            AFAIK we're not creating any Kafka topics explicitly in hello-samza. Doing so would actually be a bit tricky, because of an issue mentioned in this comment on SAMZA-152: ZK and Kafka broker need to be up before topics can be created, but a script can't tell whether that is the case. See also this email.

            So it sounds like we're just lucking out and using the default partition count for both the state job in hello-samza, and also its changelog topic. I'm guessing we're not turning on log compaction, either. This means that the changelog topic in hello-samza is having its oldest log segments deleted (time-based retention), which could lead to data loss, since an old key would be dropped and never re-added. That said, for the demo, it's OK for now. It will get fixed as part of this ticket.

            I think it would be good to add stream creation to the SystemAdmin interface. I prefer SystemAdmin.createChangelogStream over leaking Kafka-specific configuration into samza-core. The state changelog is a fundamental concept in Samza (whereas Kafka is supposed to be completely pluggable), so I don't think it's a problem to have a method like SystemAdmin.createChangelogStream in the Samza API.

            I'm leaning this way, as well. Let the individual system decide what the appropriate configurations are for a changelog stream.

            criccomini Chris Riccomini added a comment - AFAIK we're not creating any Kafka topics explicitly in hello-samza. Doing so would actually be a bit tricky, because of an issue mentioned in this comment on SAMZA-152 : ZK and Kafka broker need to be up before topics can be created, but a script can't tell whether that is the case. See also this email. So it sounds like we're just lucking out and using the default partition count for both the state job in hello-samza, and also its changelog topic. I'm guessing we're not turning on log compaction, either. This means that the changelog topic in hello-samza is having its oldest log segments deleted (time-based retention), which could lead to data loss, since an old key would be dropped and never re-added. That said, for the demo, it's OK for now. It will get fixed as part of this ticket. I think it would be good to add stream creation to the SystemAdmin interface. I prefer SystemAdmin.createChangelogStream over leaking Kafka-specific configuration into samza-core. The state changelog is a fundamental concept in Samza (whereas Kafka is supposed to be completely pluggable), so I don't think it's a problem to have a method like SystemAdmin.createChangelogStream in the Samza API. I'm leaning this way, as well. Let the individual system decide what the appropriate configurations are for a changelog stream.
            closeuris Yan Fang added a comment -

            Planning to touch this ticket. If my understanding is correct, the following steps will be needed:
            1. add the SystemAdmin.createChangelogStream API in SystemAdmin.
            2. implement this API in the Kafka System. Call this method whenever the changelog is enabled. In the implementation, we create the topic if it does not exists and accept the configuration for segment.bytes. Will not consider the situation like Martin mentioned here, where the users use the same changelog topic for the different applications.
            3. add the log compaction example/configuration in the hello-samza

            Any suggestions? Or any pieces I am missing? If all sound good, will start working on it. Thank you.

            closeuris Yan Fang added a comment - Planning to touch this ticket. If my understanding is correct, the following steps will be needed: 1. add the SystemAdmin.createChangelogStream API in SystemAdmin. 2. implement this API in the Kafka System. Call this method whenever the changelog is enabled. In the implementation, we create the topic if it does not exists and accept the configuration for segment.bytes. Will not consider the situation like Martin mentioned here , where the users use the same changelog topic for the different applications. 3. add the log compaction example/configuration in the hello-samza Any suggestions? Or any pieces I am missing? If all sound good, will start working on it. Thank you.

            If all sound good, will start working on it. Thank you.

            Yep, this sounds correct.

            One tricky part is going to be figuring out where to call SystemAdmin.createChangelogStream. The most logical place seems to be in TaskStorageManager.

            One thing to be careful of here is that there's one TaskStorageManager per container, which means the createChangelogStream might be executed multiple times on job start. We have this same problem with the KafkaCheckpointManager, which tries to create its checkpoint topic in the same way. You'll have to follow the same pattern as the KafkaCheckpointManager, which is to pessimistically try and create the change log in every task storage manager, and catch the "topic already exists" exception if the creation fails. You CANNOT fetch the topic's metadata (TopicMetadataRequest) before trying to create the changelog, because Kafka will AUTOMATICALLY create the topic if it doesn't exist when the topic metadata request is sent.

            Have a look at the KafkaCheckpointManager's createTopic code, and also SAMZA-289 for the edge case that I'm talking about.

            criccomini Chris Riccomini added a comment - If all sound good, will start working on it. Thank you. Yep, this sounds correct. One tricky part is going to be figuring out where to call SystemAdmin.createChangelogStream. The most logical place seems to be in TaskStorageManager. One thing to be careful of here is that there's one TaskStorageManager per container, which means the createChangelogStream might be executed multiple times on job start. We have this same problem with the KafkaCheckpointManager, which tries to create its checkpoint topic in the same way. You'll have to follow the same pattern as the KafkaCheckpointManager, which is to pessimistically try and create the change log in every task storage manager, and catch the "topic already exists" exception if the creation fails. You CANNOT fetch the topic's metadata (TopicMetadataRequest) before trying to create the changelog, because Kafka will AUTOMATICALLY create the topic if it doesn't exist when the topic metadata request is sent. Have a look at the KafkaCheckpointManager's createTopic code, and also SAMZA-289 for the edge case that I'm talking about.
            closeuris Yan Fang added a comment -

            Great. Very helpful information. Will keep that in mind when implementing. Thank you.

            closeuris Yan Fang added a comment - Great. Very helpful information. Will keep that in mind when implementing. Thank you.
            cpsoman Chinmay Soman added a comment -

            closeuris : Have you already started working on this ? Otherwise Naveen can pick it up (since he does not have any tasks at the moment)

            cpsoman Chinmay Soman added a comment - closeuris : Have you already started working on this ? Otherwise Naveen can pick it up (since he does not have any tasks at the moment)
            closeuris Yan Fang added a comment -

            still working in SAMZA-310. Sure, go ahead. One tip:

            One thing to be careful of here is that there's one TaskStorageManager per container, which means the createChangelogStream might be executed multiple times on job start. We have this same problem with the KafkaCheckpointManager, which tries to create its checkpoint topic in the same way. You'll have to follow the same pattern as the KafkaCheckpointManager, which is to pessimistically try and create the change log in every task storage manager, and catch the "topic already exists" exception if the creation fails. You CANNOT fetch the topic's metadata (TopicMetadataRequest) before trying to create the changelog, because Kafka will AUTOMATICALLY create the topic if it doesn't exist when the topic metadata request is sent

            In SAMZA-310, I have the methods in org.apache.samza.util.KafkaUtil to deal with above situations (not merged). Maybe you want to reuse it later. But it should not block anything.

            closeuris Yan Fang added a comment - still working in SAMZA-310 . Sure, go ahead. One tip: One thing to be careful of here is that there's one TaskStorageManager per container, which means the createChangelogStream might be executed multiple times on job start. We have this same problem with the KafkaCheckpointManager, which tries to create its checkpoint topic in the same way. You'll have to follow the same pattern as the KafkaCheckpointManager, which is to pessimistically try and create the change log in every task storage manager, and catch the "topic already exists" exception if the creation fails. You CANNOT fetch the topic's metadata (TopicMetadataRequest) before trying to create the changelog, because Kafka will AUTOMATICALLY create the topic if it doesn't exist when the topic metadata request is sent In SAMZA-310 , I have the methods in org.apache.samza.util.KafkaUtil to deal with above situations (not merged). Maybe you want to reuse it later. But it should not block anything.
            naveenatceg Naveen Somasundaram added a comment - - edited

            In the current implementation the consumer configures a topic name explicitly in order to have change log topic enabled, while that makes sense, in efforts to move toward a zero-conf model, do you guys think it makes sense to have a default prefix to a store name and create the topic without the user having to specify it explicitly ? Having to understand the need for a change log to use Samza seems rather a heavy weight user experience. If the consumer really doesn't care about the state being consistently persisted, then we can provide an option to turn off change log (I am guessing purely for performance reasons). What do you guys think ?

            Anyway, for the first patch, I'm just going to auto-create the topic from user specified configurations.

            naveenatceg Naveen Somasundaram added a comment - - edited In the current implementation the consumer configures a topic name explicitly in order to have change log topic enabled, while that makes sense, in efforts to move toward a zero-conf model, do you guys think it makes sense to have a default prefix to a store name and create the topic without the user having to specify it explicitly ? Having to understand the need for a change log to use Samza seems rather a heavy weight user experience. If the consumer really doesn't care about the state being consistently persisted, then we can provide an option to turn off change log (I am guessing purely for performance reasons). What do you guys think ? Anyway, for the first patch, I'm just going to auto-create the topic from user specified configurations.

            do you guys think it makes sense to have a default prefix to a store name and create the topic without the user having to specify it explicitly?

            I do find it weird how the configuration is currently inconsistent: task.checkpoint.system takes only a system name and auto-generates the stream name, whereas stores.*.changelog takes a system-stream pair.

            Perhaps here the argument was that it can make sense for one job to consume another job's changelog stream (it is, after all, a meaningful stream of state changes) whereas it usually doesn't make much sense to consume another job's checkpoint stream (except perhaps for monitoring purposes). So a checkpoint stream is more of an implementation detail, a changelog stream is more like a public-facing interface of a job. If you put it that way, it makes sense to be able to explicitly configure the changelog stream name.

            However, IMHO the inconsistency between the two configs, and the danger of bad copy-and-paste (if you have two different jobs writing to the same changelog stream, the result is havoc, so blindly copying one job's store config into another job is really bad), leads me to think that autogenerated changelog stream names would be better. We could still have an option to override the stream name, but it probably makes more sense to generate it from the job name and job ID by default.

            martinkl Martin Kleppmann added a comment - do you guys think it makes sense to have a default prefix to a store name and create the topic without the user having to specify it explicitly? I do find it weird how the configuration is currently inconsistent: task.checkpoint.system takes only a system name and auto-generates the stream name, whereas stores.*.changelog takes a system-stream pair. Perhaps here the argument was that it can make sense for one job to consume another job's changelog stream (it is, after all, a meaningful stream of state changes) whereas it usually doesn't make much sense to consume another job's checkpoint stream (except perhaps for monitoring purposes). So a checkpoint stream is more of an implementation detail, a changelog stream is more like a public-facing interface of a job. If you put it that way, it makes sense to be able to explicitly configure the changelog stream name. However, IMHO the inconsistency between the two configs, and the danger of bad copy-and-paste (if you have two different jobs writing to the same changelog stream, the result is havoc, so blindly copying one job's store config into another job is really bad), leads me to think that autogenerated changelog stream names would be better. We could still have an option to override the stream name, but it probably makes more sense to generate it from the job name and job ID by default.

            Defaulting to an auto-generated name seems reasonable to me. naveenatceg, can you open a separate ticket for that?

            criccomini Chris Riccomini added a comment - Defaulting to an auto-generated name seems reasonable to me. naveenatceg , can you open a separate ticket for that?
            naveenatceg Naveen Somasundaram added a comment - Reviewboard https://reviews.apache.org/r/28016/ here.

            Looks like there's a compilation error:

            $ bin/check-all.sh
            ....
            [ant:scalac] /Users/criccomi/Code/incubator-samza/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala:88: error: recursive variable systemAdmin needs type
            [ant:scalac]       var systemAdmin = systemAdmins.getOrElse(systemStream.getSystem, throw new SamzaException("Unable to get systemAdmin for store %s " + storeName + " with systemAdmin " + systemAdmin))
            [ant:scalac]                                                                                                                                                                                ^
            [ant:scalac] one error found
            
            criccomini Chris Riccomini added a comment - Looks like there's a compilation error: $ bin/check-all.sh .... [ant:scalac] /Users/criccomi/Code/incubator-samza/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala:88: error: recursive variable systemAdmin needs type [ant:scalac] var systemAdmin = systemAdmins.getOrElse(systemStream.getSystem, throw new SamzaException("Unable to get systemAdmin for store %s " + storeName + " with systemAdmin " + systemAdmin)) [ant:scalac] ^ [ant:scalac] one error found

            Attached another patch with review comments addressed.

            naveenatceg Naveen Somasundaram added a comment - Attached another patch with review comments addressed.

            Attached another patch! (The previous one fails the scala 2_9 on some inner class issues as pointed out by Chris)

            naveenatceg Naveen Somasundaram added a comment - Attached another patch! (The previous one fails the scala 2_9 on some inner class issues as pointed out by Chris)

            +1 Merged and committed to 0.8.0. Tests pass. Validated that TestStatefulTask creates a changelog the first time, and validates it the second.

            Please open two follow on JIRAs:

            1. Merge patch for master.
            2. Use a default changelog stream name when one is not configured.
            criccomini Chris Riccomini added a comment - +1 Merged and committed to 0.8.0. Tests pass. Validated that TestStatefulTask creates a changelog the first time, and validates it the second. Please open two follow on JIRAs: Merge patch for master. Use a default changelog stream name when one is not configured.

            Also, this patch will break hello-samza (latest). Please open a JIRA and fix that as well.

            criccomini Chris Riccomini added a comment - Also, this patch will break hello-samza (latest). Please open a JIRA and fix that as well.

            What is the issue with hello-samza? I tried running it just now, and found that the wikipedia-stats job wouldn't start. The message in the log was: "KafkaSystemAdmin [WARN] Failed to create topic wikipedia-stats-changelog: kafka.admin.AdminOperationException: replication factor: 2 larger than available brokers: 1. Retrying."

            I tried adding stores.wikipedia-stats.changelog.replication.factor=1 to the job config but it made no difference.

            martinkl Martin Kleppmann added a comment - What is the issue with hello-samza? I tried running it just now, and found that the wikipedia-stats job wouldn't start. The message in the log was: "KafkaSystemAdmin [WARN] Failed to create topic wikipedia-stats-changelog: kafka.admin.AdminOperationException: replication factor: 2 larger than available brokers: 1. Retrying." I tried adding stores.wikipedia-stats.changelog.replication.factor=1 to the job config but it made no difference.

            Oh, I figured it after reading the code. You need to set stores.wikipedia-stats-changelog.changelog.replication.factor=1, because you're using the changelog topic name ("wikipedia-stats-changelog"), not the store name ("wikipedia-stats"), when reading the configuration. I assume that's a bug?

            martinkl Martin Kleppmann added a comment - Oh, I figured it after reading the code. You need to set stores.wikipedia-stats-changelog.changelog.replication.factor=1, because you're using the changelog topic name ("wikipedia-stats-changelog"), not the store name ("wikipedia-stats"), when reading the configuration. I assume that's a bug?

            Hey martinkl, Chris was talking about change in the SystemAdmin interface, which could break the hello world because it implements SinglePartitionWithoutOffsetsSystemAdmin. However, it does not break it because SinglePartitionWithoutOffsetsSystemAdmin, which also implements the SystemAdmin interface has been changed to implement the new method ("createChangelogStream").

            naveenatceg Naveen Somasundaram added a comment - Hey martinkl , Chris was talking about change in the SystemAdmin interface, which could break the hello world because it implements SinglePartitionWithoutOffsetsSystemAdmin. However, it does not break it because SinglePartitionWithoutOffsetsSystemAdmin, which also implements the SystemAdmin interface has been changed to implement the new method ("createChangelogStream").

            Ok, the API change doesn't affect hello-samza, but the default replication factor of 2 does affect hello-samza because it runs Kafka in a single-node config by default. At the moment the changelog.replication.factor config is broken, because it uses the stream name rather than the store name. I think we need to fix this (it's really confusing, and it contradicts the documentation), so I'm reopening this issue.

            martinkl Martin Kleppmann added a comment - Ok, the API change doesn't affect hello-samza, but the default replication factor of 2 does affect hello-samza because it runs Kafka in a single-node config by default. At the moment the changelog.replication.factor config is broken, because it uses the stream name rather than the store name. I think we need to fix this (it's really confusing, and it contradicts the documentation), so I'm reopening this issue.

            A few more comments/questions.

            • A goal of this jira was to auto-create changelog topics with compaction enabled and a smaller segment size. If I read this patch correctly, those defaults aren't actually being set in the current implementation. I think that would be worth doing. (KafkaCheckpointManagerFactory does this, perhaps you can share implementation between checkpointing and changelogging.)
            • You introduced new configuration parameters stores.%s.changelog.kafka.* but not added them to the documentation.
            • I've also added a few comments on the RB.

            Sorry for not getting to review this earlier; I only saw the issues on the 0.8.0 RC. I think we should either fix these issues for 0.8.0, or revert this patch for 0.8.0 and release a fixed version on 0.8.1.

            martinkl Martin Kleppmann added a comment - A few more comments/questions. A goal of this jira was to auto-create changelog topics with compaction enabled and a smaller segment size. If I read this patch correctly, those defaults aren't actually being set in the current implementation. I think that would be worth doing. (KafkaCheckpointManagerFactory does this, perhaps you can share implementation between checkpointing and changelogging.) You introduced new configuration parameters stores.%s.changelog.kafka.* but not added them to the documentation. I've also added a few comments on the RB. Sorry for not getting to review this earlier; I only saw the issues on the 0.8.0 RC. I think we should either fix these issues for 0.8.0, or revert this patch for 0.8.0 and release a fixed version on 0.8.1.

            At the moment the changelog.replication.factor config is broken, because it uses the stream name rather than the store name. I think we need to fix this (it's really confusing, and it contradicts the documentation), so I'm reopening this issue.

            +1

            I think we should either fix these issues for 0.8.0, or revert this patch for 0.8.0 and release a fixed version on 0.8.1.

            I'm leaning towards a revert. Either way, 0.8.0 is going to require a new RC, and another 72h+ vote. But I don't want to add another week's worth of wait time on top of that to get the patch updated. Given that this ticket was optional in 0.8.0 to being with, I'd say we should revert and fix before the next release. Thoughts?

            criccomini Chris Riccomini added a comment - At the moment the changelog.replication.factor config is broken, because it uses the stream name rather than the store name. I think we need to fix this (it's really confusing, and it contradicts the documentation), so I'm reopening this issue. +1 I think we should either fix these issues for 0.8.0, or revert this patch for 0.8.0 and release a fixed version on 0.8.1. I'm leaning towards a revert. Either way, 0.8.0 is going to require a new RC, and another 72h+ vote. But I don't want to add another week's worth of wait time on top of that to get the patch updated. Given that this ticket was optional in 0.8.0 to being with, I'd say we should revert and fix before the next release. Thoughts?

            I'd say we should revert and fix before the next release.

            Fine with me.

            martinkl Martin Kleppmann added a comment - I'd say we should revert and fix before the next release. Fine with me.

            Reverted.

            criccomini Chris Riccomini added a comment - Reverted.

            New RB with comments addressed here:
            https://reviews.apache.org/r/29012/

            naveenatceg Naveen Somasundaram added a comment - New RB with comments addressed here: https://reviews.apache.org/r/29012/

            LGTM. Ran bin/check-all.sh and all tests pass. martinkl, will let this linger for 24h before committing, in case you want to have another look.

            criccomini Chris Riccomini added a comment - LGTM. Ran bin/check-all.sh and all tests pass. martinkl , will let this linger for 24h before committing, in case you want to have another look.

            naveenatceg, please post patch on this JIRA.

            criccomini Chris Riccomini added a comment - naveenatceg , please post patch on this JIRA.

            attached patch rb29012.patch.

            naveenatceg Naveen Somasundaram added a comment - attached patch rb29012.patch.

            Merged and committed to master.

            criccomini Chris Riccomini added a comment - Merged and committed to master.

            People

              naveenatceg Naveen Somasundaram
              criccomini Chris Riccomini
              Votes:
              1 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: