Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-402

Provide a "shared state" store among StreamTasks

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: container, kv
    • Labels:

      Description

      There has been a lot of discussion about shared state stores in SAMZA-353. Initially, it seemed as though we might implement them through SAMZA-353, but now it seems more preferable to implement them separately. As such, this ticket is to discuss global state/shared state (terms that are being used interchangeably) between StreamTasks.

      1. DESIGN-SAMZA-402-0.md
        10 kB
        Chris Riccomini
      2. DESIGN-SAMZA-402-0.pdf
        101 kB
        Chris Riccomini
      3. DESIGN-SAMZA-402-1.md
        21 kB
        Chris Riccomini
      4. DESIGN-SAMZA-402-1.pdf
        167 kB
        Chris Riccomini

        Issue Links

          Activity

          Hide
          criccomini Chris Riccomini added a comment -

          Attaching initial design doc.

          The question at hand is whether the current approach is sufficient for us to proceed with an implementation?

          I'm leaning towards "no" at the moment. I feel as though the lack of atomicity, deletion support, and lagging state make this current design pretty difficult to use properly under most use cases.

          It seems as though using a cache in front of a remote DB is a much easier way to handle the read-only cache use case. I'm not quite sure what to do about the inter-task state use case, specifically as it relates to lagging state.

          Show
          criccomini Chris Riccomini added a comment - Attaching initial design doc. The question at hand is whether the current approach is sufficient for us to proceed with an implementation? I'm leaning towards "no" at the moment. I feel as though the lack of atomicity, deletion support, and lagging state make this current design pretty difficult to use properly under most use cases. It seems as though using a cache in front of a remote DB is a much easier way to handle the read-only cache use case. I'm not quite sure what to do about the inter-task state use case, specifically as it relates to lagging state.
          Hide
          martinkl Martin Kleppmann added a comment -

          I think this is a useful feature. Some more example use cases for a "map-side join" (shared state that is only read by the job):

          • A control channel for a job, allowing an administrator to adjust aspects of the job at runtime without job restart. For example, a job could be deployed with two different algorithms and an A/B-testing facility. An administrator could adjust whether algorithm A or algorithm B should be used by updating a particular key in the shared store.
          • A repository for Avro schemas (see SAMZA-317), where the store contains a mapping from schema ID to schema.

          At the moment, if you want to implement those things, you have to do one of the following:

          • Create a stream for the shared state (with at least as many partitions as any other input stream), co-partition that stream with the input streams, and write each state update to all partitions of that stream (so that it is delivered to all StreamTasks in the job). That seems wasteful and ugly.
          • Connect to some external system (not managed by Samza), e.g. an external database. This increases the number of infrastructure components that need to be deployed, makes the job more complicated, and potentially performs worse. For example, the job may cache lookups to an external database, but without consuming a changelog from that database it won't know when cache entries need to be invalidated (you can only use a TTL and cross your fingers). A cold cache after container restart will perform badly as every lookup incurs a network round-trip.

          Both of those suck. Even a simple shared state abstraction (read-only, no support for atomic swaps, no special handling of deletions) would make the implementation of this kind of use cases significantly nicer.

          If we want to support use cases where a batch job pushes a new version of the state that completely replaces the old version, then we would probably need atomic swaps and handling of deletions. For that reason, I'm inclined to not support such batch updates of shared state. Batch-updated state can continue to use Voldemort.

          Regarding mutable state shared between StreamTasks: I think this would be a dangerous abstraction for the reasons you describe. A true implementation of mutable shared state would require a consensus algorithm and would be a nightmare.

          I think single-writer state would probably be safe (as you describe, using the task name as key). However, I would prefer to think of this as a kind of asynchronous message passing: one task is sending a message to the other tasks, saying "my counter value is now x". Put that way, the key is the "sender" of the message.

          In my opinion, the key-value interface for a shared store should not permit writing (calling put() should raise an exception), to avoid setting false expectations of synchronous updates and magical distributed consistency. Job authors who know what they're doing can still write to the store asynchronously by sending a message to the output stream that is the changelog for the store. That way the write looks conceptually more like sending a message to the other tasks, and less like a state update. But that should be considered advanced usage, because it's up to the job author to enforce things like the single-writer constraint.

          (If this kind of cross-task coordination use case turns out to be common, we could consider adding an abstraction on top of shared stores which enforces things like "one writer per key". But that can be a separate, future issue.)

          On SAMZA-353 we discussed whether the StreamTask should be notified about changes in the store. I now think that probably isn't necessary, at least for a first version.

          In summary: just because certain use cases can't easily be satisfied, we shouldn't throw the baby out with the bathwater. I think we should implement a simple version of shared state which is read-only and which only supports single-key updates (no batch updates, no atomic switching), like you describe in the implementation section of the design doc. That would already be very useful, and leave open our options to support more use cases in future.

          Show
          martinkl Martin Kleppmann added a comment - I think this is a useful feature. Some more example use cases for a "map-side join" (shared state that is only read by the job): A control channel for a job, allowing an administrator to adjust aspects of the job at runtime without job restart. For example, a job could be deployed with two different algorithms and an A/B-testing facility. An administrator could adjust whether algorithm A or algorithm B should be used by updating a particular key in the shared store. A repository for Avro schemas (see SAMZA-317 ), where the store contains a mapping from schema ID to schema. At the moment, if you want to implement those things, you have to do one of the following: Create a stream for the shared state (with at least as many partitions as any other input stream), co-partition that stream with the input streams, and write each state update to all partitions of that stream (so that it is delivered to all StreamTasks in the job). That seems wasteful and ugly. Connect to some external system (not managed by Samza), e.g. an external database. This increases the number of infrastructure components that need to be deployed, makes the job more complicated, and potentially performs worse. For example, the job may cache lookups to an external database, but without consuming a changelog from that database it won't know when cache entries need to be invalidated (you can only use a TTL and cross your fingers). A cold cache after container restart will perform badly as every lookup incurs a network round-trip. Both of those suck. Even a simple shared state abstraction (read-only, no support for atomic swaps, no special handling of deletions) would make the implementation of this kind of use cases significantly nicer. If we want to support use cases where a batch job pushes a new version of the state that completely replaces the old version, then we would probably need atomic swaps and handling of deletions. For that reason, I'm inclined to not support such batch updates of shared state. Batch-updated state can continue to use Voldemort. Regarding mutable state shared between StreamTasks: I think this would be a dangerous abstraction for the reasons you describe. A true implementation of mutable shared state would require a consensus algorithm and would be a nightmare. I think single-writer state would probably be safe (as you describe, using the task name as key). However, I would prefer to think of this as a kind of asynchronous message passing: one task is sending a message to the other tasks, saying "my counter value is now x". Put that way, the key is the "sender" of the message. In my opinion, the key-value interface for a shared store should not permit writing (calling put() should raise an exception), to avoid setting false expectations of synchronous updates and magical distributed consistency. Job authors who know what they're doing can still write to the store asynchronously by sending a message to the output stream that is the changelog for the store. That way the write looks conceptually more like sending a message to the other tasks, and less like a state update. But that should be considered advanced usage, because it's up to the job author to enforce things like the single-writer constraint. (If this kind of cross-task coordination use case turns out to be common, we could consider adding an abstraction on top of shared stores which enforces things like "one writer per key". But that can be a separate, future issue.) On SAMZA-353 we discussed whether the StreamTask should be notified about changes in the store. I now think that probably isn't necessary, at least for a first version. In summary: just because certain use cases can't easily be satisfied, we shouldn't throw the baby out with the bathwater. I think we should implement a simple version of shared state which is read-only and which only supports single-key updates (no batch updates, no atomic switching), like you describe in the implementation section of the design doc. That would already be very useful, and leave open our options to support more use cases in future.
          Hide
          criccomini Chris Riccomini added a comment -

          If we want to support use cases where a batch job pushes a new version of the state that completely replaces the old version, then we would probably need atomic swaps and handling of deletions. For that reason, I'm inclined to not support such batch updates of shared state. Batch-updated state can continue to use Voldemort.

          Yea, I was thinking along this line as well. If you want atomic swaps, you'll have to use a remote store.

          In my opinion, the key-value interface for a shared store should not permit writing (calling put() should raise an exception), to avoid setting false expectations of synchronous updates and magical distributed consistency.

          Agreed. I think this was the main difference between my latest proposal, and yours. I was saying that put() should write to the DB immediately. I've come to realize that it's not easily implementable, and pretty confusing, so I agree with what you're saying here.

          On SAMZA-353 we discussed whether the StreamTask should be notified about changes in the store. I now think that probably isn't necessary, at least for a first version.

          I was planning to punt on this as well. We could always add some callback or something like that later.

          In summary: just because certain use cases can't easily be satisfied, we shouldn't throw the baby out with the bathwater. I think we should implement a simple version of shared state which is read-only and which only supports single-key updates (no batch updates, no atomic switching), like you describe in the implementation section of the design doc. That would already be very useful, and leave open our options to support more use cases in future.

          I guess the question is: does this implementation provide enough of a benefit to be worth doing, vs. just a remote store with a local cache? The two arguments that I can come up with are:

          1. Operational complexity of running a remote store.
          2. Performance will be better if there are no remote queries.

          The deciding factor to me on which approach is actually going to be "better" for a Samza job is whether the state that it needs is already in a DB. If it's already in a DB, and has to continue to remain there for other reasons, then there is complexity in setting up a change log and having the Samza job consume the state (vs. just querying it). If it's not, then the global state solution seems preferable (since the data is probably coming from a Hadoop push).

          It kind of feels like there are two use cases here:

          1. Primary data exists on a remote DB and is being used by other stuff (e.g. front ends) in addition to the Samza job.
          2. Derived data is computed offline, and needs to be pushed somewhere for the Samza job to use.

          For (1), remote DB with cache seems better. For (2), I think the global store is better.

          Show
          criccomini Chris Riccomini added a comment - If we want to support use cases where a batch job pushes a new version of the state that completely replaces the old version, then we would probably need atomic swaps and handling of deletions. For that reason, I'm inclined to not support such batch updates of shared state. Batch-updated state can continue to use Voldemort. Yea, I was thinking along this line as well. If you want atomic swaps, you'll have to use a remote store. In my opinion, the key-value interface for a shared store should not permit writing (calling put() should raise an exception), to avoid setting false expectations of synchronous updates and magical distributed consistency. Agreed. I think this was the main difference between my latest proposal, and yours. I was saying that put() should write to the DB immediately. I've come to realize that it's not easily implementable, and pretty confusing, so I agree with what you're saying here. On SAMZA-353 we discussed whether the StreamTask should be notified about changes in the store. I now think that probably isn't necessary, at least for a first version. I was planning to punt on this as well. We could always add some callback or something like that later. In summary: just because certain use cases can't easily be satisfied, we shouldn't throw the baby out with the bathwater. I think we should implement a simple version of shared state which is read-only and which only supports single-key updates (no batch updates, no atomic switching), like you describe in the implementation section of the design doc. That would already be very useful, and leave open our options to support more use cases in future. I guess the question is: does this implementation provide enough of a benefit to be worth doing, vs. just a remote store with a local cache? The two arguments that I can come up with are: Operational complexity of running a remote store. Performance will be better if there are no remote queries. The deciding factor to me on which approach is actually going to be "better" for a Samza job is whether the state that it needs is already in a DB. If it's already in a DB, and has to continue to remain there for other reasons, then there is complexity in setting up a change log and having the Samza job consume the state (vs. just querying it). If it's not, then the global state solution seems preferable (since the data is probably coming from a Hadoop push). It kind of feels like there are two use cases here: Primary data exists on a remote DB and is being used by other stuff (e.g. front ends) in addition to the Samza job. Derived data is computed offline, and needs to be pushed somewhere for the Samza job to use. For (1), remote DB with cache seems better. For (2), I think the global store is better.
          Hide
          martinkl Martin Kleppmann added a comment -

          The deciding factor to me on which approach is actually going to be "better" for a Samza job is whether the state that it needs is already in a DB. If it's already in a DB, and has to continue to remain there for other reasons, then there is complexity in setting up a change log and having the Samza job consume the state (vs. just querying it).

          If the DB is already there, then the job author can just use the client library to query it directly. Any caching that is put in place isn't really specific to Samza in any way. So I am inclined to think that Samza doesn't need explicit support for calls to external DBs — people can just use whatever existing mechanisms there are. Or am I missing something?

          If it's not, then the global state solution seems preferable (since the data is probably coming from a Hadoop push).

          I agree that the discussed approach for global state is preferable if there is no existing DB (due to the operational complexity of running the additional DB). However, I wouldn't assume that such global state would be coming from Hadoop. Indeed, the two example use cases I gave above have nothing to do with Hadoop.

          Show
          martinkl Martin Kleppmann added a comment - The deciding factor to me on which approach is actually going to be "better" for a Samza job is whether the state that it needs is already in a DB. If it's already in a DB, and has to continue to remain there for other reasons, then there is complexity in setting up a change log and having the Samza job consume the state (vs. just querying it). If the DB is already there, then the job author can just use the client library to query it directly. Any caching that is put in place isn't really specific to Samza in any way. So I am inclined to think that Samza doesn't need explicit support for calls to external DBs — people can just use whatever existing mechanisms there are. Or am I missing something? If it's not, then the global state solution seems preferable (since the data is probably coming from a Hadoop push). I agree that the discussed approach for global state is preferable if there is no existing DB (due to the operational complexity of running the additional DB). However, I wouldn't assume that such global state would be coming from Hadoop. Indeed, the two example use cases I gave above have nothing to do with Hadoop.
          Hide
          criccomini Chris Riccomini added a comment -

          Any caching that is put in place isn't really specific to Samza in any way. So I am inclined to think that Samza doesn't need explicit support for calls to external DBs — people can just use whatever existing mechanisms there are. Or am I missing something?

          I agree Samza doesn't need to provide any direct support for caching. One thing we're discussing implementing is a ReadThroughCache store. The reason for implementing the cache as a store is that you can use it as a warm cache when the job is restarted if you were to attach a changelog to it. Implementing this cache shouldn't change Samza at all, but it'd be nice for users to have something to use in cases where they are going to query a remote store.

          Show
          criccomini Chris Riccomini added a comment - Any caching that is put in place isn't really specific to Samza in any way. So I am inclined to think that Samza doesn't need explicit support for calls to external DBs — people can just use whatever existing mechanisms there are. Or am I missing something? I agree Samza doesn't need to provide any direct support for caching. One thing we're discussing implementing is a ReadThroughCache store. The reason for implementing the cache as a store is that you can use it as a warm cache when the job is restarted if you were to attach a changelog to it. Implementing this cache shouldn't change Samza at all, but it'd be nice for users to have something to use in cases where they are going to query a remote store.
          Hide
          criccomini Chris Riccomini added a comment -

          Attaching updated design docs. Sorry, it got a bit bigger.

          • Called out difference between unpartitioned local state and remote external state.
          • Atomicity, handling deletions, message mutation, inter-task state, and lagging state sections are largely unchanged.
          • Added a "proposed solutions" section to propose a an external read-through cache.
          • Updated implementation section to think through the high-level design required to unify the existing partition state implementation in Samza with external read-through cache and unpartitioned local state. The goal is to re-use and generalize as much of what we already have as possible.
          Show
          criccomini Chris Riccomini added a comment - Attaching updated design docs. Sorry, it got a bit bigger. Called out difference between unpartitioned local state and remote external state. Atomicity, handling deletions, message mutation, inter-task state, and lagging state sections are largely unchanged. Added a "proposed solutions" section to propose a an external read-through cache. Updated implementation section to think through the high-level design required to unify the existing partition state implementation in Samza with external read-through cache and unpartitioned local state. The goal is to re-use and generalize as much of what we already have as possible.
          Hide
          martinkl Martin Kleppmann added a comment -

          +1 on the proposal. I'm in favour of use case specific configuration (2), since it's easier to use, and I can't see any benefit in the greater generality of a more complex configuration.

          Detail question: from which changelog partition(s) should a local shared store be restored/updated? If several partitions are consumed, this introduces an ordering problem. I am inclined to say that this stream should be created with one partition, and if there are several partitions, the job only reads from partition 0 and ignores the others. As shared local state is intended for fairly small data volumes, it shouldn't be a problem to put it all in one partition.

          Show
          martinkl Martin Kleppmann added a comment - +1 on the proposal. I'm in favour of use case specific configuration (2), since it's easier to use, and I can't see any benefit in the greater generality of a more complex configuration. Detail question: from which changelog partition(s) should a local shared store be restored/updated? If several partitions are consumed, this introduces an ordering problem. I am inclined to say that this stream should be created with one partition, and if there are several partitions, the job only reads from partition 0 and ignores the others. As shared local state is intended for fairly small data volumes, it shouldn't be a problem to put it all in one partition.
          Hide
          criccomini Chris Riccomini added a comment - - edited

          from which changelog partition(s) should a local shared store be restored/updated

          I was thinking all of them.

          If several partitions are consumed, this introduces an ordering problem.

          I think this ordering problem only matters in two scenarios:

          1. There are writes for the same key in different partitions.
          2. The store implementation is some-how order-dependent (write k1, write k2 results in a different state than write k2, write k1).

          I was thinking of the input streams as being partitioned by key, so (1) doesn't seem to be a problem to me. I hadn't much considered (2).

          I am inclined to say that this stream should be created with one partition, and if there are several partitions, the job only reads from partition 0 and ignores the others. As shared local state is intended for fairly small data volumes, it shouldn't be a problem to put it all in one partition.

          I was kind of thinking the reverse: consume from everything by default, and assume that the stream is partitioned by key and the store doesn't care about ordering of writes for different keys. The developer can always create their input stream to have a single partition, which should automatically solve both problems (1) and (2) if they need to, but I think the default use case should be to just read everything and load it in. This seems to me like the most common pattern.

          Show
          criccomini Chris Riccomini added a comment - - edited from which changelog partition(s) should a local shared store be restored/updated I was thinking all of them. If several partitions are consumed, this introduces an ordering problem. I think this ordering problem only matters in two scenarios: There are writes for the same key in different partitions. The store implementation is some-how order-dependent (write k1, write k2 results in a different state than write k2, write k1). I was thinking of the input streams as being partitioned by key, so (1) doesn't seem to be a problem to me. I hadn't much considered (2). I am inclined to say that this stream should be created with one partition, and if there are several partitions, the job only reads from partition 0 and ignores the others. As shared local state is intended for fairly small data volumes, it shouldn't be a problem to put it all in one partition. I was kind of thinking the reverse: consume from everything by default, and assume that the stream is partitioned by key and the store doesn't care about ordering of writes for different keys. The developer can always create their input stream to have a single partition, which should automatically solve both problems (1) and (2) if they need to, but I think the default use case should be to just read everything and load it in. This seems to me like the most common pattern.
          Hide
          criccomini Chris Riccomini added a comment -

          Attaching a link to KAFKA-560. It's possible that we might be able to leverage this feature to do atomic swaps with global state stores by having a data push go to a new topic every time, and then updating the SamzaContainer to consume from the new topic. This is a place holder so we don't lost the KAFKA ticket that would allow the old topics to expire after some delay.

          Show
          criccomini Chris Riccomini added a comment - Attaching a link to KAFKA-560 . It's possible that we might be able to leverage this feature to do atomic swaps with global state stores by having a data push go to a new topic every time, and then updating the SamzaContainer to consume from the new topic. This is a place holder so we don't lost the KAFKA ticket that would allow the old topics to expire after some delay.
          Hide
          martinkl Martin Kleppmann added a comment -

          I was thinking of the input streams as being partitioned by key, so (1) doesn't seem to be a problem to me. I hadn't much considered (2).

          As long as we're restricting ourselves to a plain key-value data model, that's probably ok. However, I still have a mild preference for using a single partition, as I think it could be less confusing in some edge cases. Say you write to the stream using a client (in some other system) which doesn't do key partitioning in the same way, or which accidentally omits the partitioning key. If we're consuming a single partition, that write will either take effect (if it's in partition 0) or be ignored, but the outcome is deterministic. If we're consuming all partitions, the store ends up in a nondeterministic state which could change when it is rebuilt. So it seems to me that a single partition is less error-prone, and I can't see a compelling advantage of using multiple partitions.

          Show
          martinkl Martin Kleppmann added a comment - I was thinking of the input streams as being partitioned by key, so (1) doesn't seem to be a problem to me. I hadn't much considered (2). As long as we're restricting ourselves to a plain key-value data model, that's probably ok. However, I still have a mild preference for using a single partition, as I think it could be less confusing in some edge cases. Say you write to the stream using a client (in some other system) which doesn't do key partitioning in the same way, or which accidentally omits the partitioning key. If we're consuming a single partition, that write will either take effect (if it's in partition 0) or be ignored, but the outcome is deterministic. If we're consuming all partitions, the store ends up in a nondeterministic state which could change when it is rebuilt. So it seems to me that a single partition is less error-prone, and I can't see a compelling advantage of using multiple partitions.
          Hide
          criccomini Chris Riccomini added a comment -

          Say you write to the stream using a client (in some other system) which doesn't do key partitioning in the same way, or which accidentally omits the partitioning key. If we're consuming a single partition, that write will either take effect (if it's in partition 0) or be ignored, but the outcome is deterministic.

          But in this case, isn't the fact that it's deterministic kind of irrelevant since the state is basically corrupted (since a portion of the writes are totally disregarded)? It seems that in this scenario a more desirable thing would be to just fail outright if the input stream has more than one partition.

          So it seems to me that a single partition is less error-prone, and I can't see a compelling advantage of using multiple partitions.

          Two reasons that I like the multi-partition approach are:

          1. The Samza job reading the input stream for global state might not have control over the partition size. For example, if it's consuming from the changelog of another Samza job to build its global state.
          2. I can foresee people pushing data from Hadoop (or some other mechanism) to a topic that doesn't yet exist. When this happens, the default partition size will be used, which in most real-world production clusters is a partition count > 1 (a wild guess, but it's true at LI, anyway). When this happens, the Samza job will either disregard all of the state except partition 0, or fail the job (depending on implementation). The developer will then be forced to either shrink the topic partition size (can't be done in Kafka), or create a new stream and delete the old one (topics can't be deleted in Kafka either, yet).
          Show
          criccomini Chris Riccomini added a comment - Say you write to the stream using a client (in some other system) which doesn't do key partitioning in the same way, or which accidentally omits the partitioning key. If we're consuming a single partition, that write will either take effect (if it's in partition 0) or be ignored, but the outcome is deterministic. But in this case, isn't the fact that it's deterministic kind of irrelevant since the state is basically corrupted (since a portion of the writes are totally disregarded)? It seems that in this scenario a more desirable thing would be to just fail outright if the input stream has more than one partition. So it seems to me that a single partition is less error-prone, and I can't see a compelling advantage of using multiple partitions. Two reasons that I like the multi-partition approach are: The Samza job reading the input stream for global state might not have control over the partition size. For example, if it's consuming from the changelog of another Samza job to build its global state. I can foresee people pushing data from Hadoop (or some other mechanism) to a topic that doesn't yet exist. When this happens, the default partition size will be used, which in most real-world production clusters is a partition count > 1 (a wild guess, but it's true at LI, anyway). When this happens, the Samza job will either disregard all of the state except partition 0, or fail the job (depending on implementation). The developer will then be forced to either shrink the topic partition size (can't be done in Kafka), or create a new stream and delete the old one (topics can't be deleted in Kafka either, yet).
          Hide
          guozhang Guozhang Wang added a comment -

          Are there other use case examples besides ML weights for unpartitioned local state? For ML usage alone, usually the global model parameters are updated at some synchronized barriers, such as the end of each iteration. And some recent work have enable user to config whether they want to update these global parameters synchronously or "asynchronously" allowing different processors to read different versions of the parameter value, e.g. GraphLab.

          Show
          guozhang Guozhang Wang added a comment - Are there other use case examples besides ML weights for unpartitioned local state? For ML usage alone, usually the global model parameters are updated at some synchronized barriers, such as the end of each iteration. And some recent work have enable user to config whether they want to update these global parameters synchronously or "asynchronously" allowing different processors to read different versions of the parameter value, e.g. GraphLab.
          Hide
          criccomini Chris Riccomini added a comment -

          Are there other use case examples besides ML weights for unpartitioned local state?

          Yes. We kind of whittled things down to two different use cases:

          1. Read-only global state that functions as a table to do a stream-table join.
          2. Read-write global state that functions as a way for tasks to share their state with eachother.

          (1) is usually used when you're computing some static data set and pushing it to the job periodically. This could be done via a Hadoop to Kafka push, or simply by consuming the state from another Samza job's changelog stream.

          (2) is useful in the ML case you've described. We have opted not to directly support read-write (ML-style) state. As you've said, for iterative algorithms, it's definitely possible to have calculations done locally, and just periodically sync the global state via some secondary Samza job. The counting example in the design doc could be achieved simply by having every StreamTask periodically send their local counts, and having a second aggregator job calculate the full count.

          In addition, (2) can be achieved indirectly by having StreamTasks send messages directly to the read-only store's change log stream. This is "advanced", since race conditions pop up when you don't have single-writers for a given key in the store. We've opted to make this possible, but not directly supported. Basically, it's discouraged.

          In short, I agree that (2) can be achieved in several alternative ways, so we aren't directly supporting it. The current proposal is for read-only global state.

          Martin Kleppmann might have some more ideas for how (2) could be used, as well.

          Show
          criccomini Chris Riccomini added a comment - Are there other use case examples besides ML weights for unpartitioned local state? Yes. We kind of whittled things down to two different use cases: Read-only global state that functions as a table to do a stream-table join. Read-write global state that functions as a way for tasks to share their state with eachother. (1) is usually used when you're computing some static data set and pushing it to the job periodically. This could be done via a Hadoop to Kafka push, or simply by consuming the state from another Samza job's changelog stream. (2) is useful in the ML case you've described. We have opted not to directly support read-write (ML-style) state. As you've said, for iterative algorithms, it's definitely possible to have calculations done locally, and just periodically sync the global state via some secondary Samza job. The counting example in the design doc could be achieved simply by having every StreamTask periodically send their local counts, and having a second aggregator job calculate the full count. In addition, (2) can be achieved indirectly by having StreamTasks send messages directly to the read-only store's change log stream. This is "advanced", since race conditions pop up when you don't have single-writers for a given key in the store. We've opted to make this possible, but not directly supported. Basically, it's discouraged. In short, I agree that (2) can be achieved in several alternative ways, so we aren't directly supporting it. The current proposal is for read-only global state. Martin Kleppmann might have some more ideas for how (2) could be used, as well.
          Hide
          closeuris Yan Fang added a comment - - edited

          +1 for the proposal.

          1. two other use cases I can think of about the global state are

          • continuously update ML model. This is related to the ML weights example in the proposal. Since we enable updating the state, this opens the potential of updating the ML model without stopping the Samza job. Users update/retrain the model in the batch process and then push the result to the state stream. Then Samza reads the updated state (that is, the new model) and keeps processing. The limitation is that, this only works when the users only change the model's weight but do not change the feature numbers and the algorithm. But I think changing parameters of the model happens a lot. Even this makes tuning the model possible in the Samza !
          • control the Samza job. If the state can be updates, controlling the Samza job through the state stream seems feasible. Task.process reads the state and react to this state (command). It's a little like SAMZA-348 but has different goal.

          2. A question about the "read-only" concept. Do we plan to provide an mechanism to really guarantee "read-only"? Since the state stream actually can be updated even by the same Samza job. It's more like "write" is discouraged. It maybe related to what Martin brings up in Samza-300 , some lock-like stuff.

          3. How do we localize the "global state"? Not sure if we leave this to the users or we do this job. If we do this, then user's life will become easier because they can simply call the "global key" and get the value. Then question for us is that, where we want to store the "global state". Putting the global state to the local state store seems better than putting to HashMap. But then it will requires the "global key" to be different from any "local-key". Otherwise, it will be occasionally overridden by local operations. If we leave this to the users, they need to write an extra task to process the global state, which I think maybe too much. Of course, we can always provide a default setting and an API to override.

          Show
          closeuris Yan Fang added a comment - - edited +1 for the proposal. 1. two other use cases I can think of about the global state are continuously update ML model. This is related to the ML weights example in the proposal. Since we enable updating the state, this opens the potential of updating the ML model without stopping the Samza job. Users update/retrain the model in the batch process and then push the result to the state stream. Then Samza reads the updated state (that is, the new model) and keeps processing. The limitation is that, this only works when the users only change the model's weight but do not change the feature numbers and the algorithm. But I think changing parameters of the model happens a lot. Even this makes tuning the model possible in the Samza ! control the Samza job. If the state can be updates, controlling the Samza job through the state stream seems feasible. Task.process reads the state and react to this state (command). It's a little like SAMZA-348 but has different goal. 2. A question about the "read-only" concept. Do we plan to provide an mechanism to really guarantee "read-only"? Since the state stream actually can be updated even by the same Samza job. It's more like "write" is discouraged. It maybe related to what Martin brings up in Samza-300 , some lock-like stuff. 3. How do we localize the "global state"? Not sure if we leave this to the users or we do this job. If we do this, then user's life will become easier because they can simply call the "global key" and get the value. Then question for us is that, where we want to store the "global state". Putting the global state to the local state store seems better than putting to HashMap. But then it will requires the "global key" to be different from any "local-key". Otherwise, it will be occasionally overridden by local operations. If we leave this to the users, they need to write an extra task to process the global state, which I think maybe too much. Of course, we can always provide a default setting and an API to override.
          Hide
          criccomini Chris Riccomini added a comment -

          Do we plan to provide an mechanism to really guarantee "read-only"?

          I don't think so. I was thinking we'd just have the read-only store not have a put() method (i.e. it wouldn't be a KeyValueStore, it'd be a ReadOnlyKeyValueStore, or something).

          Since the state stream actually can be updated even by the same Samza job. It's more like "write" is discouraged. It maybe related to what Martin brings up in Samza-300 , some lock-like stuff.

          You're correct. It's more like "write is discouraged". There's really no good way for us to prevent writes to the changelog (that I can think of). Personally, I think this is OK.

          How do we localize the "global state"? Not sure if we leave this to the users or we do this job.

          This was discussed a bit in SAMZA-353. In the current proposal, the SamzaContainer will be responsible for localizing the global state (not hte StreamTask). The StreamTask will get the global state store using the TaskContext, as with any other store, and will call .get() on it, as usual (assuming it's a KeyValueStore).

          Putting the global state to the local state store seems better than putting to HashMap.

          Agreed. The current proposal is to have the SamzaContainer put the global state into a local state store (shared among StreamTasks, one per-container).

          But then it will requires the "global key" to be different from any "local-key". Otherwise, it will be occasionally overridden by local operations.

          I think we'd just have two stores in this scenario:

          def init(...) {
            val localStore = context.getStore("my-local-store").asInstanceOf[KeyValueStore]
            val globalStore = context.getStore("my-global-store").asInstanceOf[ReadOnlyKeyValueStore]
          }
          
          Show
          criccomini Chris Riccomini added a comment - Do we plan to provide an mechanism to really guarantee "read-only"? I don't think so. I was thinking we'd just have the read-only store not have a put() method (i.e. it wouldn't be a KeyValueStore, it'd be a ReadOnlyKeyValueStore, or something). Since the state stream actually can be updated even by the same Samza job. It's more like "write" is discouraged. It maybe related to what Martin brings up in Samza-300 , some lock-like stuff. You're correct. It's more like "write is discouraged". There's really no good way for us to prevent writes to the changelog (that I can think of). Personally, I think this is OK. How do we localize the "global state"? Not sure if we leave this to the users or we do this job. This was discussed a bit in SAMZA-353 . In the current proposal, the SamzaContainer will be responsible for localizing the global state (not hte StreamTask). The StreamTask will get the global state store using the TaskContext, as with any other store, and will call .get() on it, as usual (assuming it's a KeyValueStore). Putting the global state to the local state store seems better than putting to HashMap. Agreed. The current proposal is to have the SamzaContainer put the global state into a local state store (shared among StreamTasks, one per-container). But then it will requires the "global key" to be different from any "local-key". Otherwise, it will be occasionally overridden by local operations. I think we'd just have two stores in this scenario: def init(...) { val localStore = context.getStore( "my-local-store" ).asInstanceOf[KeyValueStore] val globalStore = context.getStore( "my-global-store" ).asInstanceOf[ReadOnlyKeyValueStore] }
          Hide
          criccomini Chris Riccomini added a comment -

          Attaching a link to KAFKA-1639. Doing atomic pushes and swaps could be implemented via control messages.

          Show
          criccomini Chris Riccomini added a comment - Attaching a link to KAFKA-1639 . Doing atomic pushes and swaps could be implemented via control messages.

            People

            • Assignee:
              Unassigned
              Reporter:
              criccomini Chris Riccomini
            • Votes:
              2 Vote for this issue
              Watchers:
              11 Start watching this issue

              Dates

              • Created:
                Updated:

                Development