Details

    • Type: New Feature
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:

      Description

      AWS Kinesis is a publish-subscribe message broker service quite similar to Kafka, provided as a hosted service by Amazon. I have spoken to a few people who are interested in using Kinesis with Samza, since the options for stateful stream processing with Kinesis are currently quite limited. Samza's local state support would be great for Kinesis users.

      I've looked a little into what it would take to support Kinesis in Samza. Useful resources:

      Kinesis is similar to Kafka in that it has total ordering of messages within a partition (which Kinesis calls a "shard"). The most notable differences I noticed are:

      • Kinesis does not support compaction by key, and only keeps messages for 24 hours (the "trim horizon"). Thus it cannot be used for checkpointing and state store changelogging. Another service must be used for durable storage (Amazon recommends DynamoDB).
      • It is common for the number of shards in a stream to change ("resharding"), because a Kinesis shard is a unit of resourcing, not a logical grouping. A Kinesis shard is more like a Kafka broker node, not like a Kafka partition.

      The second point suggests that Kinesis shards should not be mapped 1:1 to Samza StreamTasks like we do for Kafka, because whenever the number of shards changes, any state associated with a StreamTask would no longer be in the right place.

      Kinesis assigns a message to a shard based on the MD5 hash of the message's partition key (so all messages with the same partition key are guaranteed to be in the same shard). Each shard owns a continuous range of the MD5 hash space. When the number of shards is increased by one, a shard's hash range is subdivided into two sub-ranges. When the number of shards is decreased by one, two adjacent shards' hash ranges are merged into a single range.

      I think the nicest way of modelling this in Samza would be to create a fixed number of StreamTasks (e.g. 256, but make it configurable), and to assign each task a fixed slice of this MD5 hash space. Each Kinesis shard then corresponds to a subset of these StreamTasks, and the SystemConsumer implementation routes messages from a shard to the appropriate StreamTask based on the hash of the message's partition key. This implies that all the StreamTasks for a particular Kinesis shard should be processed within the same container. This is not unlike the Kafka consumer in Samza, which fetches messages for all of a container's Kafka partitions in one go.

      This solves removes the semantic problem of resharding: we can ensure that messages with the same partition key are always routed to the same StreamTask, even across shard splits and merges.

      However, there are still some tricky edge cases to handle. For example, if Kinesis decides to merge two shards that are currently processed by two different Samza containers, what should Samza do? A simple (but perhaps a bit wasteful) solution would be for both containers to continue consuming the merged shard. Alternatively, Samza could reassign some StreamTasks from one container to another, but that would require any state to be moved or rebuilt. Probably double-consuming would make most sense for a first implementation.

      In summary, it looks like Kinesis support is feasible, and would be a fun challenge for someone to take on. Contributions welcome

        Activity

        Hide
        criccomini Chris Riccomini added a comment -

        Thanks for looking into this. The sharding strategy that Kinesis uses seems quite interesting and unique.

        However, there are still some tricky edge cases to handle.

        I was actually thinking the reverse of your edge case: what happens when a shard overlaps to StreamTasks, or do we even allow this? If a shard's range is 0-100, and we have two StreamTasks responsible for ranges 0-75 and 76-150, respectively, who processes the shard? We can suppose that we always bind to the lowest of the stream tasks, by range (0-75), but what happens if 0-100 splits into 0-20,21-40,41-60,61-80, and 81-100 (5x split)? Should 81-100 continue being assigned to the 0-75 StreamTask? How can we know, next time around, that this is the assignment we should make? Perhaps the assignments could be done via the CoordinatorStream, the same way that we handle changelog-partition assignments now.

        We could also force the StreamTask count to == the shard count at the time the job first starts. This would fix shard splitting, since you can continue having the single StreamTasks process all sub-shards that fall out of the initial shard that existed when you started up, but it doesn't fix the problem you mention about shard merging.

        Also, what happens to a Kinesis consumer in a case where it's consuming a shard and the shard suddenly splits? In Sama's model, we'd want the JobCoordinator to shut down all containers, re-assign partitions (shards), then start the containers back up again.

        Show
        criccomini Chris Riccomini added a comment - Thanks for looking into this. The sharding strategy that Kinesis uses seems quite interesting and unique. However, there are still some tricky edge cases to handle. I was actually thinking the reverse of your edge case: what happens when a shard overlaps to StreamTasks, or do we even allow this? If a shard's range is 0-100, and we have two StreamTasks responsible for ranges 0-75 and 76-150, respectively, who processes the shard? We can suppose that we always bind to the lowest of the stream tasks, by range (0-75), but what happens if 0-100 splits into 0-20,21-40,41-60,61-80, and 81-100 (5x split)? Should 81-100 continue being assigned to the 0-75 StreamTask? How can we know, next time around, that this is the assignment we should make? Perhaps the assignments could be done via the CoordinatorStream, the same way that we handle changelog-partition assignments now. We could also force the StreamTask count to == the shard count at the time the job first starts. This would fix shard splitting, since you can continue having the single StreamTasks process all sub-shards that fall out of the initial shard that existed when you started up, but it doesn't fix the problem you mention about shard merging. Also, what happens to a Kinesis consumer in a case where it's consuming a shard and the shard suddenly splits? In Sama's model, we'd want the JobCoordinator to shut down all containers, re-assign partitions (shards), then start the containers back up again.
        Hide
        martinkl Martin Kleppmann added a comment -

        "Quite interesting and unique" is a polite way of putting it. I'd have said "made the same mistake as Cassandra before version 1.2" — not separating logical partitioning from resource allocation

        If a shard's range is 0-100, and we have two StreamTasks responsible for ranges 0-75 and 76-150, respectively, who processes the shard?

        If the two StreamTasks are in the same container, that container can consume the shard, and demultiplex based on each message's partition key. If the two StreamTasks are in different containers, both containers would need to consume the shard, and discard the messages that are addressed to the other one. This is a bit wasteful, but would be logically correct.

        what happens if 0-100 splits into 0-20,21-40,41-60,61-80, and 81-100 (5x split)?

        I believe splits and merges are always two-way. If we make the number of StreamTasks a power of 2, then it seems quite likely that the StreamTasks' hash range boundaries will be aligned with the Kinesis shard boundaries. (Since the keys are hashed, there should be no reason to split a shard any other way than exactly in half.) If people do stupid things with their shard splitting, we can still fall back to double-consuming a shard.

        We could also force the StreamTask count to == the shard count at the time the job first starts.

        This would not be ideal, because billing is by number of shards. People might well start off with only one shard, to keep it cheap. This would limit them to one Samza container, even as throughput grows.

        Also, what happens to a Kinesis consumer in a case where it's consuming a shard and the shard suddenly splits? In Sama's model, we'd want the JobCoordinator to shut down all containers, re-assign partitions (shards), then start the containers back up again.

        I was told that AWS is introducing auto-scaling for Kinesis, which means that the number of shards may well change quite often (even depending on time of day). Restarting the job every time would not be ideal.

        I'd be inclined to leave the Samza containers unchanged, and have the consumer of the parent shard (the one that was split) continue by consuming the two child shards (the shards that were created in the split). If the job can't keep up, the user can choose to increase the container count, but that wouldn't happen automatically.

        For merging, if the two merged shards are consumed by the same container, no restart is necessary, as described above. If we fall back to double-consuming a shard (because the merged shards were in different containers), a job restart would be necessary to rebalance the tasks. We can probably minimize the chances of double-consuming by assigning StreamTasks for adjacent ranges of hashes to the same container.

        If the number of containers is a power of 2, this ought to work nicely. Say you have 256 StreamTasks, 8 containers, 8 Kinesis shards. Each message goes to the StreamTask according to the first byte of its hash. If StreamTasks 0 to 31 are in container 0, StreamTasks 32 to 63 in container 1, etc, and if shards always split their range in half, then you'll end up with each container consuming exactly one Kinesis shard.

        Show
        martinkl Martin Kleppmann added a comment - "Quite interesting and unique" is a polite way of putting it. I'd have said "made the same mistake as Cassandra before version 1.2 " — not separating logical partitioning from resource allocation If a shard's range is 0-100, and we have two StreamTasks responsible for ranges 0-75 and 76-150, respectively, who processes the shard? If the two StreamTasks are in the same container, that container can consume the shard, and demultiplex based on each message's partition key. If the two StreamTasks are in different containers, both containers would need to consume the shard, and discard the messages that are addressed to the other one. This is a bit wasteful, but would be logically correct. what happens if 0-100 splits into 0-20,21-40,41-60,61-80, and 81-100 (5x split)? I believe splits and merges are always two-way . If we make the number of StreamTasks a power of 2, then it seems quite likely that the StreamTasks' hash range boundaries will be aligned with the Kinesis shard boundaries. (Since the keys are hashed, there should be no reason to split a shard any other way than exactly in half.) If people do stupid things with their shard splitting, we can still fall back to double-consuming a shard. We could also force the StreamTask count to == the shard count at the time the job first starts. This would not be ideal, because billing is by number of shards. People might well start off with only one shard, to keep it cheap. This would limit them to one Samza container, even as throughput grows. Also, what happens to a Kinesis consumer in a case where it's consuming a shard and the shard suddenly splits? In Sama's model, we'd want the JobCoordinator to shut down all containers, re-assign partitions (shards), then start the containers back up again. I was told that AWS is introducing auto-scaling for Kinesis, which means that the number of shards may well change quite often (even depending on time of day). Restarting the job every time would not be ideal. I'd be inclined to leave the Samza containers unchanged, and have the consumer of the parent shard (the one that was split) continue by consuming the two child shards (the shards that were created in the split). If the job can't keep up, the user can choose to increase the container count, but that wouldn't happen automatically. For merging, if the two merged shards are consumed by the same container, no restart is necessary, as described above. If we fall back to double-consuming a shard (because the merged shards were in different containers), a job restart would be necessary to rebalance the tasks. We can probably minimize the chances of double-consuming by assigning StreamTasks for adjacent ranges of hashes to the same container. If the number of containers is a power of 2, this ought to work nicely. Say you have 256 StreamTasks, 8 containers, 8 Kinesis shards. Each message goes to the StreamTask according to the first byte of its hash. If StreamTasks 0 to 31 are in container 0, StreamTasks 32 to 63 in container 1, etc, and if shards always split their range in half, then you'll end up with each container consuming exactly one Kinesis shard.
        Hide
        martinkl Martin Kleppmann added a comment -

        I spent a while with Ian Meyers today, working through some of the details of what Kinesis support for Samza could look like.

        The aforementioned sharding behaviour of Kinesis unfortunately does not fit very well with a stateful processing model that has local state. The problem is that a shard (which owns a certain range of the key hash space) can be split arbitrarily, not just at half-way point. This is a deliberate decision as it gives operators the ability to deal with hot spots in the data: to give an extreme example, if there's one particular key with particularly high message volume, that key could be put in a shard all by itself. Kinesis doesn't provide any clever tooling for determining how to split shards — it's pretty much left up to the user.

        (Ian and I think that this was probably a design mistake. It would have been better to deal with hot spots by appending a random number to messages with the hot key, and thus scatter them across the hash space. However, that would mean losing the total ordering for those keys, and make aggregations harder. Trade-offs, trade-offs.)

        Anyway, this means that whilst the mapping from message keys to shards is clear at one moment in time, that mapping may well change over time as shards are split and merged. Any stateful process that wants to partition its state along with the shards of the streams is thus going to have a hard time. This is not a problem with Samza, it's an intrinsic issue with the way Kinesis works.

        At the moment we're working on a prototype implementation of a Kinesis consumer which will ignore this issue for now. It should be fine for consuming messages, but probably won't support durable local state. (In-memory transient state is fine, and state backed by a remote datastore is fine.) The lack of support for local state is further reinforced by Kinesis only retaining data for 24 hours, and not supporting log compaction like Kafka does, making it unsuitable as durable changelog for Samza's k-v stores.

        Despite all those caveats, I reckon this will be a useful feature.

        Show
        martinkl Martin Kleppmann added a comment - I spent a while with Ian Meyers today, working through some of the details of what Kinesis support for Samza could look like. The aforementioned sharding behaviour of Kinesis unfortunately does not fit very well with a stateful processing model that has local state. The problem is that a shard (which owns a certain range of the key hash space) can be split arbitrarily, not just at half-way point. This is a deliberate decision as it gives operators the ability to deal with hot spots in the data: to give an extreme example, if there's one particular key with particularly high message volume, that key could be put in a shard all by itself. Kinesis doesn't provide any clever tooling for determining how to split shards — it's pretty much left up to the user. (Ian and I think that this was probably a design mistake. It would have been better to deal with hot spots by appending a random number to messages with the hot key, and thus scatter them across the hash space. However, that would mean losing the total ordering for those keys, and make aggregations harder. Trade-offs, trade-offs.) Anyway, this means that whilst the mapping from message keys to shards is clear at one moment in time, that mapping may well change over time as shards are split and merged. Any stateful process that wants to partition its state along with the shards of the streams is thus going to have a hard time. This is not a problem with Samza, it's an intrinsic issue with the way Kinesis works. At the moment we're working on a prototype implementation of a Kinesis consumer which will ignore this issue for now. It should be fine for consuming messages, but probably won't support durable local state. (In-memory transient state is fine, and state backed by a remote datastore is fine.) The lack of support for local state is further reinforced by Kinesis only retaining data for 24 hours, and not supporting log compaction like Kafka does, making it unsuitable as durable changelog for Samza's k-v stores. Despite all those caveats, I reckon this will be a useful feature.
        Hide
        ryannedolan Ryanne Dolan added a comment -

        Martin Kleppmann As part of a project to replicate Kinesis streams to Kafka streams and vice versa, I've implemented:

        • a KinesisSystemConsumer which leverages the KCL
        • a KinesisSystemProducer which leverages the KPL
        • a BasicKinesisSystemProducer which talks directly to the Kinesis API
        • a KinesisSystemAdmin which queries the Kinesis API

        Planned:

        • a BasicKinesisSystemConsumer which talks directly to the Kinesis API

        Unsolved Mysteries:

        • how to synchronize checkpoints between the KCL and Samza
        • how to async-batch records without risking data loss

        I've got most of this working and am starting work on validating against production streams at LinkedIn. If you've worked on some of this same code, I'd love to see what we can share. If not, I don't mind stealing this ticket from you.

        Show
        ryannedolan Ryanne Dolan added a comment - Martin Kleppmann As part of a project to replicate Kinesis streams to Kafka streams and vice versa, I've implemented: a KinesisSystemConsumer which leverages the KCL a KinesisSystemProducer which leverages the KPL a BasicKinesisSystemProducer which talks directly to the Kinesis API a KinesisSystemAdmin which queries the Kinesis API Planned: a BasicKinesisSystemConsumer which talks directly to the Kinesis API Unsolved Mysteries: how to synchronize checkpoints between the KCL and Samza how to async-batch records without risking data loss I've got most of this working and am starting work on validating against production streams at LinkedIn. If you've worked on some of this same code, I'd love to see what we can share. If not, I don't mind stealing this ticket from you.
        Hide
        closeuris Yan Fang added a comment -

        Hi Ryanne Dolan, thank you. Since Renato Javier Marroquín Mogrovejo is working on this as the GSoC in this summer. Maybe you want to reach him as well. Because he already implemented the Kinesis as well.

        Show
        closeuris Yan Fang added a comment - Hi Ryanne Dolan , thank you. Since Renato Javier Marroquín Mogrovejo is working on this as the GSoC in this summer. Maybe you want to reach him as well. Because he already implemented the Kinesis as well.
        Hide
        boneill42 Brian ONeill added a comment -

        Ryanne Dolan Renato Javier Marroquín Mogrovejo - Any chance we can help out as well?

        We will likely release some bridge code that connects Kinesis to Druid (via Tranquility). But we expect we'll need to add Samza/Storm/Spark to that bridge. Having used Storm extensively in the past, I'd rather go with Samza. If you have code available, we can put some effort into testing, and refining the checkpoint mechanisms.

        Show
        boneill42 Brian ONeill added a comment - Ryanne Dolan Renato Javier Marroquín Mogrovejo - Any chance we can help out as well? We will likely release some bridge code that connects Kinesis to Druid (via Tranquility). But we expect we'll need to add Samza/Storm/Spark to that bridge. Having used Storm extensively in the past, I'd rather go with Samza. If you have code available, we can put some effort into testing, and refining the checkpoint mechanisms.
        Hide
        renato2099 Renato Javier Marroquín Mogrovejo added a comment -

        Hi Brian ONeill, I will try to put up the versions I did during GoogleSummerOfCode, and we can tackle from there together. I have been caught up with other things sorry! but I will get it this week defo

        Show
        renato2099 Renato Javier Marroquín Mogrovejo added a comment - Hi Brian ONeill , I will try to put up the versions I did during GoogleSummerOfCode, and we can tackle from there together. I have been caught up with other things sorry! but I will get it this week defo
        Hide
        jayson.minard Jayson Minard added a comment -

        The retention period for Kinesis is 1..7 days, with default being 24 hours:

        Data records are accessible for a default of 24 hours from the time they are added to a stream. This time frame is called the retention period and is configurable in hourly increments from 24 to 168 hours (1 to 7 days). For more information about a stream’s retention period, see the section called “Changing the Data Retention Period”.

        from http://docs.aws.amazon.com/kinesis/latest/dev/service-sizes-and-limits.html

        Show
        jayson.minard Jayson Minard added a comment - The retention period for Kinesis is 1..7 days, with default being 24 hours: Data records are accessible for a default of 24 hours from the time they are added to a stream. This time frame is called the retention period and is configurable in hourly increments from 24 to 168 hours (1 to 7 days). For more information about a stream’s retention period, see the section called “Changing the Data Retention Period”. from http://docs.aws.amazon.com/kinesis/latest/dev/service-sizes-and-limits.html

          People

          • Assignee:
            martinkl Martin Kleppmann
            Reporter:
            martinkl Martin Kleppmann
          • Votes:
            2 Vote for this issue
            Watchers:
            13 Start watching this issue

            Dates

            • Created:
              Updated:

              Development