Details

    • Type: Sub-task
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.6.0
    • Fix Version/s: 0.8.0
    • Component/s: container
    • Labels:

      Description

      Currently the AM sends a set of all the topics and partitions to the container, which then groups them by partition and assigns each set to a task instance. By moving the grouping to the AM, we can assign arbitrary groups to task instances, which will allow more partitioning strategies, as discussed in SAMZA-71.

      1. SAMZA-123-2.patch
        193 kB
        Jakob Homan
      2. SAMZA-123-3.patch
        193 kB
        Jakob Homan
      3. SAMZA-123-4.patch
        202 kB
        Jakob Homan
      4. SAMZA-123-5.patch
        204 kB
        Jakob Homan
      5. SAMZA-123-6.patch
        214 kB
        Jakob Homan
      6. SAMZA-123-7.patch
        212 kB
        Jakob Homan
      7. SAMZA-123-8.patch
        252 kB
        Jakob Homan
      8. SAMZA-123-design-doc.md
        21 kB
        Jakob Homan
      9. SAMZA-123-design-doc.pdf
        161 kB
        Jakob Homan
      10. SAMZA-123-draft.patch
        171 kB
        Jakob Homan

        Issue Links

          Activity

          Hide
          jghoman Jakob Homan added a comment - - edited

          If we treat the assignment of topic-partitions to task instances as a group by operation, it's easy to obtain the three most useful strategies: grouped by partition, one task instance per topic-partition (highest granularity) and one task intance for all topic-partitions (lowest granularity):

          As a summary of the code:

          import scala.collection.immutable._
          case class TopicPartition(topic:String, partition:Int)
          
          trait TopicPartitionGrouper {
            def group(tps:Set[TopicPartition]):Set[Set[TopicPartition]]
          }
          
          val allTps = Set(TopicPartition("DC1-Pageview", 0), 
                           TopicPartition("DC1-Click", 0), 
                           TopicPartition("DC2-Pageview", 1),
                           TopicPartition("DC2-Click", 1), 
                           TopicPartition("DC1-Pageview", 2),
                           TopicPartition("DC3-Combined", 0))
          
          /** Our current approach **/
          class GroupByPartition() extends TopicPartitionGrouper {
            def group(tps:Set[TopicPartition]) = tps.groupBy(_.partition).map(_._2).toSet
          }
          
          class GroupByTopicPrefix extends TopicPartitionGrouper {
            def group(tps:Set[TopicPartition]) = tps.groupBy(_.topic.subSequence(0,3)).map(_._2).toSet
          }
          
          class GroupByTopicPrefixAndPartition extends TopicPartitionGrouper {
            def group(tps:Set[TopicPartition]) = tps.groupBy(tp => (tp.topic.subSequence(0,3), tp.partition)).map(_._2).toSet
          }
          
          class OnePerTI extends TopicPartitionGrouper {
            def group(tps:Set[TopicPartition]) = tps.map(Set(_))
          }
          
          class OneBigTI extends TopicPartitionGrouper {
            def group(tps:Set[TopicPartition]) = Set(tps)
          }
          
          val strategies = Map("partition" -> new GroupByPartition,
                               "topicPrefix" -> new GroupByTopicPrefix,
                               "topicPrefixAndPartition" -> new GroupByTopicPrefixAndPartition,
                               "onePerTI" -> new OnePerTI,
                               "oneBigTI" -> new OneBigTI)
          
          for(s <- strategies) {
            println("For strategy " + s._1 + " we would get the following task instances:")
            s._2.group(allTps).foreach(g => println("  TI: " + g))
          }

          gives us each of the combinations we would want:

          (M=d929d1) jhoman@JHOMAN-MN s005 ~/scalas/scala-2.8.2.final> bin/scala ~/explain.scala 
          For strategy partition we would get the following task instances:
            TI: Set(TopicPartition(DC1-Pageview,2))
            TI: Set(TopicPartition(DC2-Click,1), TopicPartition(DC2-Pageview,1))
            TI: Set(TopicPartition(DC3-Combined,0), TopicPartition(DC1-Pageview,0), TopicPartition(DC1-Click,0))
          For strategy oneBigTI we would get the following task instances:
            TI: Set(TopicPartition(DC1-Pageview,2), TopicPartition(DC2-Click,1), TopicPartition(DC2-Pageview,1), TopicPartition(DC3-Combined,0), TopicPartition(DC1-Pageview,0), TopicPartition(DC1-Click,0))
          For strategy topicPrefixAndPartition we would get the following task instances:
            TI: Set(TopicPartition(DC3-Combined,0))
            TI: Set(TopicPartition(DC1-Pageview,0), TopicPartition(DC1-Click,0))
            TI: Set(TopicPartition(DC1-Pageview,2))
            TI: Set(TopicPartition(DC2-Click,1), TopicPartition(DC2-Pageview,1))
          For strategy topicPrefix we would get the following task instances:
            TI: Set(TopicPartition(DC3-Combined,0))
            TI: Set(TopicPartition(DC2-Click,1), TopicPartition(DC2-Pageview,1))
            TI: Set(TopicPartition(DC1-Pageview,2), TopicPartition(DC1-Pageview,0), TopicPartition(DC1-Click,0))
          For strategy onePerTI we would get the following task instances:
            TI: Set(TopicPartition(DC1-Pageview,2))
            TI: Set(TopicPartition(DC2-Click,1))
            TI: Set(TopicPartition(DC2-Pageview,1))
            TI: Set(TopicPartition(DC3-Combined,0))
            TI: Set(TopicPartition(DC1-Pageview,0))
            TI: Set(TopicPartition(DC1-Click,0))

          This JIRA will change the topic partitions that are passed to the containers to be grouped per the grouping function. The containers in turn directly instantiate task instances with the provided groups. The containers themselves do not have to be aware of the grouping function that was used. Additionally, which the changes from SAMZA-122, it's possible to change the strategy for a job after it's been run once, in order to improve performance. However, change the tp grouping after job creation won't place nice with state, so that should be called out in the changes.

          Show
          jghoman Jakob Homan added a comment - - edited If we treat the assignment of topic-partitions to task instances as a group by operation, it's easy to obtain the three most useful strategies: grouped by partition, one task instance per topic-partition (highest granularity) and one task intance for all topic-partitions (lowest granularity): As a summary of the code: import scala.collection.immutable._ case class TopicPartition(topic: String , partition:Int) trait TopicPartitionGrouper { def group(tps:Set[TopicPartition]):Set[Set[TopicPartition]] } val allTps = Set(TopicPartition( "DC1-Pageview" , 0), TopicPartition( "DC1-Click" , 0), TopicPartition( "DC2-Pageview" , 1), TopicPartition( "DC2-Click" , 1), TopicPartition( "DC1-Pageview" , 2), TopicPartition( "DC3-Combined" , 0)) /** Our current approach **/ class GroupByPartition() extends TopicPartitionGrouper { def group(tps:Set[TopicPartition]) = tps.groupBy(_.partition).map(_._2).toSet } class GroupByTopicPrefix extends TopicPartitionGrouper { def group(tps:Set[TopicPartition]) = tps.groupBy(_.topic.subSequence(0,3)).map(_._2).toSet } class GroupByTopicPrefixAndPartition extends TopicPartitionGrouper { def group(tps:Set[TopicPartition]) = tps.groupBy(tp => (tp.topic.subSequence(0,3), tp.partition)).map(_._2).toSet } class OnePerTI extends TopicPartitionGrouper { def group(tps:Set[TopicPartition]) = tps.map(Set(_)) } class OneBigTI extends TopicPartitionGrouper { def group(tps:Set[TopicPartition]) = Set(tps) } val strategies = Map( "partition" -> new GroupByPartition, "topicPrefix" -> new GroupByTopicPrefix, "topicPrefixAndPartition" -> new GroupByTopicPrefixAndPartition, "onePerTI" -> new OnePerTI, "oneBigTI" -> new OneBigTI) for (s <- strategies) { println( "For strategy " + s._1 + " we would get the following task instances:" ) s._2.group(allTps).foreach(g => println( " TI: " + g)) } gives us each of the combinations we would want: (M=d929d1) jhoman@JHOMAN-MN s005 ~/scalas/scala-2.8.2.final> bin/scala ~/explain.scala For strategy partition we would get the following task instances: TI: Set(TopicPartition(DC1-Pageview,2)) TI: Set(TopicPartition(DC2-Click,1), TopicPartition(DC2-Pageview,1)) TI: Set(TopicPartition(DC3-Combined,0), TopicPartition(DC1-Pageview,0), TopicPartition(DC1-Click,0)) For strategy oneBigTI we would get the following task instances: TI: Set(TopicPartition(DC1-Pageview,2), TopicPartition(DC2-Click,1), TopicPartition(DC2-Pageview,1), TopicPartition(DC3-Combined,0), TopicPartition(DC1-Pageview,0), TopicPartition(DC1-Click,0)) For strategy topicPrefixAndPartition we would get the following task instances: TI: Set(TopicPartition(DC3-Combined,0)) TI: Set(TopicPartition(DC1-Pageview,0), TopicPartition(DC1-Click,0)) TI: Set(TopicPartition(DC1-Pageview,2)) TI: Set(TopicPartition(DC2-Click,1), TopicPartition(DC2-Pageview,1)) For strategy topicPrefix we would get the following task instances: TI: Set(TopicPartition(DC3-Combined,0)) TI: Set(TopicPartition(DC2-Click,1), TopicPartition(DC2-Pageview,1)) TI: Set(TopicPartition(DC1-Pageview,2), TopicPartition(DC1-Pageview,0), TopicPartition(DC1-Click,0)) For strategy onePerTI we would get the following task instances: TI: Set(TopicPartition(DC1-Pageview,2)) TI: Set(TopicPartition(DC2-Click,1)) TI: Set(TopicPartition(DC2-Pageview,1)) TI: Set(TopicPartition(DC3-Combined,0)) TI: Set(TopicPartition(DC1-Pageview,0)) TI: Set(TopicPartition(DC1-Click,0)) This JIRA will change the topic partitions that are passed to the containers to be grouped per the grouping function. The containers in turn directly instantiate task instances with the provided groups. The containers themselves do not have to be aware of the grouping function that was used. Additionally, which the changes from SAMZA-122 , it's possible to change the strategy for a job after it's been run once, in order to improve performance. However, change the tp grouping after job creation won't place nice with state, so that should be called out in the changes.
          Hide
          criccomini Chris Riccomini added a comment -

          No complaints on my end. Seems useful, and minimally invasive.

          Show
          criccomini Chris Riccomini added a comment - No complaints on my end. Seems useful, and minimally invasive.
          Hide
          jghoman Jakob Homan added a comment -

          Attaching design doc for the changes proposed here (and in SAMZA-71). This covers broadly what changes are made to the Samza framework and specifically how these changes affect the checkpoint log and the state facilities. Markdown version with rendered PDF version.

          Show
          jghoman Jakob Homan added a comment - Attaching design doc for the changes proposed here (and in SAMZA-71 ). This covers broadly what changes are made to the Samza framework and specifically how these changes affect the checkpoint log and the state facilities. Markdown version with rendered PDF version.
          Hide
          jghoman Jakob Homan added a comment -

          Update to docs to fix formatting.

          Show
          jghoman Jakob Homan added a comment - Update to docs to fix formatting.
          Hide
          jkreps Jay Kreps added a comment -

          Two ideas:
          1. The first is to use the terminology task id rather than cohort (in the code, not the write-up). This is a minor issue but task id is more intuitive to me.
          2. Something about the C2P log rubs me the wrong way. A cheaper way to store the mapping would just be in zk, although that would add a zk dependency from the AM.

          An alternative to the C2P topic would be to use the checkpoint topic to store the task definitions:

          Tasks have an id 0, 1, ..., N and we store their definition (i.e. the id and its inputs). E.g. if we had inputs A and B and used the GroupBySSP strategy we would have:
          0:

          {A/0, B/0}

          , 1:

          {A/1, B/1}

          , ... N:

          {A/N, B/N}

          The mapping to any change logs is just by the task id.

          The grouping API is then
          interface SSPGrouper

          { public Map<Integer, Set<SystemStreamPartition> group(Map<Integer, Set<SystemStreamPartition> current, Set<SystemStreamPartition> ssps); }

          Is this better or worse? Not sure. This version of the SSPGrouper api is less convenient I think but makes implementing validation easier.

          I think the question is really what the fate of the checkpoint topic is. As you mention KAFKA-1000 is technically superior to what we are doing. However there are several additional things it would be good to discuss and think through:

          The first is the additional metadata we would need to retain for fault tolerance semantics (the producer id and sequence number). These would be per task and need to be transactionally written with the offsets.

          The other use of the checkpoint log was to be able to run an active-passive pair of clusters.

          I think both of these things should work equally well with KAFKA-1000. but it would be good for us to just do the thinking since we have an intern coming in a month anyway to work on this...

          Show
          jkreps Jay Kreps added a comment - Two ideas: 1. The first is to use the terminology task id rather than cohort (in the code, not the write-up). This is a minor issue but task id is more intuitive to me. 2. Something about the C2P log rubs me the wrong way. A cheaper way to store the mapping would just be in zk, although that would add a zk dependency from the AM. An alternative to the C2P topic would be to use the checkpoint topic to store the task definitions: Tasks have an id 0, 1, ..., N and we store their definition (i.e. the id and its inputs). E.g. if we had inputs A and B and used the GroupBySSP strategy we would have: 0: {A/0, B/0} , 1: {A/1, B/1} , ... N: {A/N, B/N} The mapping to any change logs is just by the task id. The grouping API is then interface SSPGrouper { public Map<Integer, Set<SystemStreamPartition> group(Map<Integer, Set<SystemStreamPartition> current, Set<SystemStreamPartition> ssps); } Is this better or worse? Not sure. This version of the SSPGrouper api is less convenient I think but makes implementing validation easier. I think the question is really what the fate of the checkpoint topic is. As you mention KAFKA-1000 is technically superior to what we are doing. However there are several additional things it would be good to discuss and think through: The first is the additional metadata we would need to retain for fault tolerance semantics (the producer id and sequence number). These would be per task and need to be transactionally written with the offsets. The other use of the checkpoint log was to be able to run an active-passive pair of clusters. I think both of these things should work equally well with KAFKA-1000 . but it would be good for us to just do the thinking since we have an intern coming in a month anyway to work on this...
          Hide
          jghoman Jakob Homan added a comment -

          1. The first is to use the terminology task id rather than cohort (in the code, not the write-up). This is a minor issue but task id is more intuitive to me.

          Do you just mean a terminology change? My concern is that task id is confusing (regrettably) with container ID. Part of the appeal of cohort is that is not in any way an already-used term (as compared to partition, set, key, group, descriptor, tag, etc.) and so is it is more likely users will learn the nuances of what we're trying to do rather than making (likely) incorrect assumptions.

          interface SSPGrouper

          { public Map<Integer, Set<SystemStreamPartition> group(Map<Integer, Set<SystemStreamPartition> current, Set<SystemStreamPartition> ssps); }

          Is this better or worse? Not sure.

          This is worse as it makes the resulting mapping much more opaque for anything other than group-by-partition, particularly some of the more flexible options available through this new approach, such as the per-data-center grouping described here. For use cases such as those, a string (or other non-integer) cohort would mean that all the metrics, logging, etc. would be keyed by a random number rather than the meaningful key we could use now.
          Also, for the group-by-SSP implementation, we can map the state log partitions directly to the input-stream partitions naturally, but I'm not sure of how much benefit that would be.

          After KAFKA-1000 and moving the checkpoint offset retrieval from the stream tasks to the AM (and those being passed to the tasks directly), there's really no reason for a checkpoint log as such since the SSPGrouper determines the actual per-TI SSP set membership. However, the other use cases you describe point to the potential need for a per-job state log (in contrast to the state logs for the tasks), into which information (keyed by cohort) could be maintained. Rather than call it the C2P log (or whatever), how about we just call it the job state log, and store a map {cohort => map {{ whatever we need, starting with the state log partition mapping}? This would allow for easy future expansion and not start a precedent of an extra Kafka log for each new piece of information that is useful?

          Show
          jghoman Jakob Homan added a comment - 1. The first is to use the terminology task id rather than cohort (in the code, not the write-up). This is a minor issue but task id is more intuitive to me. Do you just mean a terminology change? My concern is that task id is confusing (regrettably) with container ID. Part of the appeal of cohort is that is not in any way an already-used term (as compared to partition, set, key, group, descriptor, tag, etc.) and so is it is more likely users will learn the nuances of what we're trying to do rather than making (likely) incorrect assumptions. interface SSPGrouper { public Map<Integer, Set<SystemStreamPartition> group(Map<Integer, Set<SystemStreamPartition> current, Set<SystemStreamPartition> ssps); } Is this better or worse? Not sure. This is worse as it makes the resulting mapping much more opaque for anything other than group-by-partition, particularly some of the more flexible options available through this new approach, such as the per-data-center grouping described here . For use cases such as those, a string (or other non-integer) cohort would mean that all the metrics, logging, etc. would be keyed by a random number rather than the meaningful key we could use now. Also, for the group-by-SSP implementation, we can map the state log partitions directly to the input-stream partitions naturally, but I'm not sure of how much benefit that would be. After KAFKA-1000 and moving the checkpoint offset retrieval from the stream tasks to the AM (and those being passed to the tasks directly), there's really no reason for a checkpoint log as such since the SSPGrouper determines the actual per-TI SSP set membership. However, the other use cases you describe point to the potential need for a per-job state log (in contrast to the state logs for the tasks), into which information (keyed by cohort) could be maintained. Rather than call it the C2P log (or whatever), how about we just call it the job state log, and store a map {cohort => map {{ whatever we need, starting with the state log partition mapping}? This would allow for easy future expansion and not start a precedent of an extra Kafka log for each new piece of information that is useful?
          Hide
          sriramsub Sriram Subramanian added a comment -

          Posting on JIRA

          1. Most of the explanation about retaining the checkpoint and state by task id applies only for the first two grouping strategies. The group by partitions and group by SSP maintain the task / partition mapping on adding more partitions. I agree that changing the grouping strategy should be undefined behavior and we can try to warn. However, there are cases where the group strategy could be the same and yet the partitions can be mapped to different tasks. For example, in the case of GroupIntoNSets, you could have the same strategy but changing the number of TIs (in order to scale for changing profile over time) would map the partitions to a different task instance. Now are the checkpoint and state information for that task valid? We can try to warn by storing the information of the task id and the partitions it was mapped to previously but it gets to be confusing. I would like to understand how would we explain the grouping strategy, state and checkpoint to work consistently across different grouping strategies. If we cannot do that, we should not make this feature extensible and rather provide fixed strategies.

          2. My vote for the naming would be taskName if it is a string and taskId if we can map all use cases to ids. Also the naming for the getStorePartitionDir need to be something like

          def getStoreTaskDir(storeBaseDir: File, storeName: String, taskName: String) = {
          new File(storeBaseDir, storeName + File.separator + taskName)

          3. I am confused about how this grouping strategy can be extended by the user of the framework. Ideally, one should be able to implement the grouping strategy and provide that as a value in the framework config "grouping.strategy=org.apache.mypackage.MyGroupingStrategy. However, with your third strategy, it requires an additional config to be added to the framework. Is the expectation that anyone who extends the grouping strategy need to add their own configs to the framework and wire them in?

          4. It would also be useful to talk about how does these grouping strategies map to a SQL like way of expressing a Samza topology. Are there any other grouping strategies that might require a lot more change in the framework that just implementing this API?

          5. Finally, unless we are very sure about this working out well, we should not make this a public API. The framework can provide three strategies and those are the only available strategies for now. Opening it to be extensible might cause us to break a lot of client logic if we find our assumptions were wrong.

          Show
          sriramsub Sriram Subramanian added a comment - Posting on JIRA 1. Most of the explanation about retaining the checkpoint and state by task id applies only for the first two grouping strategies. The group by partitions and group by SSP maintain the task / partition mapping on adding more partitions. I agree that changing the grouping strategy should be undefined behavior and we can try to warn. However, there are cases where the group strategy could be the same and yet the partitions can be mapped to different tasks. For example, in the case of GroupIntoNSets, you could have the same strategy but changing the number of TIs (in order to scale for changing profile over time) would map the partitions to a different task instance. Now are the checkpoint and state information for that task valid? We can try to warn by storing the information of the task id and the partitions it was mapped to previously but it gets to be confusing. I would like to understand how would we explain the grouping strategy, state and checkpoint to work consistently across different grouping strategies. If we cannot do that, we should not make this feature extensible and rather provide fixed strategies. 2. My vote for the naming would be taskName if it is a string and taskId if we can map all use cases to ids. Also the naming for the getStorePartitionDir need to be something like def getStoreTaskDir(storeBaseDir: File, storeName: String, taskName: String) = { new File(storeBaseDir, storeName + File.separator + taskName) 3. I am confused about how this grouping strategy can be extended by the user of the framework. Ideally, one should be able to implement the grouping strategy and provide that as a value in the framework config "grouping.strategy=org.apache.mypackage.MyGroupingStrategy. However, with your third strategy, it requires an additional config to be added to the framework. Is the expectation that anyone who extends the grouping strategy need to add their own configs to the framework and wire them in? 4. It would also be useful to talk about how does these grouping strategies map to a SQL like way of expressing a Samza topology. Are there any other grouping strategies that might require a lot more change in the framework that just implementing this API? 5. Finally, unless we are very sure about this working out well, we should not make this a public API. The framework can provide three strategies and those are the only available strategies for now. Opening it to be extensible might cause us to break a lot of client logic if we find our assumptions were wrong.
          Hide
          criccomini Chris Riccomini added a comment -

          1. I agree with Jay Kreps about the C2P topic. I'd either like to roll the C2P mapping into the Checkpoint object (and topic), or have a generic topic as Jakob Homan describes. I think having a generic topic is a big change that needs to be thought through, through (see 6, below). What else would we put in there, what would it be used for, etc.

          2. The section on verification is vague:

          Second, the AM can diff the previously C2P log entry with the result of the current set of mappings and warn (or fail) if these values disagree to some extent.

          You then go on to say that we can't verify because the logic is job dependent. What exactly is the proposed logic for this? I think we should think through the impact of adding/removing/moving SSPs on state for every strategy.

          3. I agree with Sriram Subramanian's comments on the mailing list: we should not add configs to make this fully pluggable until we're more convinced that all of this is a good idea. We did this for the MessageChooser, and I think it's the right approach here as well.

          4. I like @sriram's naming suggestion:

          My vote for the naming would be taskName if it is a string and taskId if we can map all use cases to ids.

          5. What exactly is the use case for GroupIntoNSets? If we decouple both checkpointing and state from the number of TaskInstances we have, why can't we just use GroupBySSP, and use the yarn.container.count to control the "N"?

          6. Per-Martin Kleppmann, we shouldn't use the word "state" again:

          how about we just call it the job state log

          It's just confusing, since we already use it for all the state management stuff.

          That said, the "job state" that a job has consists of is the input/output offsets for the job, its configuration, and its SSP cohort mappings. We've been treating these separately in the framework, but perhaps it makes sense to treat them the same way? Storing config between job runs allows us to verify config changes are "safe". It also allows us to do one-time mutations of the "job state" via CLI, or on job-start time (see SAMZA-180). In this model, I'm not sure if it would make sense to use KAFKA-1000, or just keep the offsets in the "job state" topic--I'd have to think about it more.

          Show
          criccomini Chris Riccomini added a comment - 1. I agree with Jay Kreps about the C2P topic. I'd either like to roll the C2P mapping into the Checkpoint object (and topic), or have a generic topic as Jakob Homan describes. I think having a generic topic is a big change that needs to be thought through, through (see 6, below). What else would we put in there, what would it be used for, etc. 2. The section on verification is vague: Second, the AM can diff the previously C2P log entry with the result of the current set of mappings and warn (or fail) if these values disagree to some extent. You then go on to say that we can't verify because the logic is job dependent. What exactly is the proposed logic for this? I think we should think through the impact of adding/removing/moving SSPs on state for every strategy. 3. I agree with Sriram Subramanian 's comments on the mailing list: we should not add configs to make this fully pluggable until we're more convinced that all of this is a good idea. We did this for the MessageChooser, and I think it's the right approach here as well. 4. I like @sriram's naming suggestion: My vote for the naming would be taskName if it is a string and taskId if we can map all use cases to ids. 5. What exactly is the use case for GroupIntoNSets? If we decouple both checkpointing and state from the number of TaskInstances we have, why can't we just use GroupBySSP, and use the yarn.container.count to control the "N"? 6. Per- Martin Kleppmann , we shouldn't use the word "state" again: how about we just call it the job state log It's just confusing, since we already use it for all the state management stuff. That said, the "job state" that a job has consists of is the input/output offsets for the job, its configuration, and its SSP cohort mappings. We've been treating these separately in the framework, but perhaps it makes sense to treat them the same way? Storing config between job runs allows us to verify config changes are "safe". It also allows us to do one-time mutations of the "job state" via CLI, or on job-start time (see SAMZA-180 ). In this model, I'm not sure if it would make sense to use KAFKA-1000 , or just keep the offsets in the "job state" topic--I'd have to think about it more.
          Hide
          jghoman Jakob Homan added a comment - - edited

          For example, in the case of GroupIntoNSets, you could have the same strategy but changing the number of TIs (in order to scale for changing profile over time) would map the partitions to a different task instance. Now are the checkpoint and state information for that task valid?

          Again, depends on the job itself.

          • A job that uses the state to record per-SSP commutative information such as a job that does per-SSP buffered sums (ie, counts some property of the SSP, storing the count in the data store and emits those values on a regular basis, perhaps for further aggregation upstream) would be fine. The existing values would be emited and then not updated again. The SSPs in their new task homes would be started from 0 and go from there (contingent on our current at-least-once guarantee. Once we have consumer-based checkpointing and idempontent producer, this would continue work as well with exactly-once).
          • A job that does a join but not framework-managed state would be fine as the keys would go ahead and hash to their new homes and still be available for pairing.
          • A job that does a join and requires framework-managed state would not be fine (ie would break) because this amounts to a mid-stream change in the partitioning function, which would be invalid in pretty much any case. This would be equivalent to changing the upstream partitioning and expecting the saved state to be valid, except that in that case we wouldn't know it had even happened, whereas with this we could detect the change.

          My vote for the naming would be taskName if it is a string and taskId if we can map all use cases to ids.

          As above, I do not want to use anything that is overloaded or could reasonably be confused by a newcomer with existing YARN or Map-Reduce terminology. Cohort is nice because people will have to use what it is and how to use it. The novelty is a feature, not a bug.

          Is the expectation that anyone who extends the grouping strategy need to add their own configs to the framework and wire them in?

          Depends on the implementation, but this could certainly be the case. Some implementations wouldn't require any extra configuration, others could have it hardwired in and some would need configuration-time definition. This is true with pretty much anything we have pluggable.

          Are there any other grouping strategies that might require a lot more change in the framework that just implementing this API?

          Not that I can think of and in fact this feature may be an important help in bringing SQL-like capabilities to Samza, as it allows very precise control over SSP-to-TI assignment which would be useful for join optimization, etc.

          Finally, unless we are very sure about this working out well, we should not make this a public API.

          I personally am, but I don't think that with a project this young we should be overly cautious about experimenting or trying to be more feature-rich. We're not yet 1.0 and are quite up-front that the framework is evolving. We've provided no API guarantees thus far.

          Show
          jghoman Jakob Homan added a comment - - edited For example, in the case of GroupIntoNSets, you could have the same strategy but changing the number of TIs (in order to scale for changing profile over time) would map the partitions to a different task instance. Now are the checkpoint and state information for that task valid? Again, depends on the job itself. A job that uses the state to record per-SSP commutative information such as a job that does per-SSP buffered sums (ie, counts some property of the SSP, storing the count in the data store and emits those values on a regular basis, perhaps for further aggregation upstream) would be fine. The existing values would be emited and then not updated again. The SSPs in their new task homes would be started from 0 and go from there (contingent on our current at-least-once guarantee. Once we have consumer-based checkpointing and idempontent producer, this would continue work as well with exactly-once). A job that does a join but not framework-managed state would be fine as the keys would go ahead and hash to their new homes and still be available for pairing. A job that does a join and requires framework-managed state would not be fine (ie would break) because this amounts to a mid-stream change in the partitioning function, which would be invalid in pretty much any case. This would be equivalent to changing the upstream partitioning and expecting the saved state to be valid, except that in that case we wouldn't know it had even happened, whereas with this we could detect the change. My vote for the naming would be taskName if it is a string and taskId if we can map all use cases to ids. As above, I do not want to use anything that is overloaded or could reasonably be confused by a newcomer with existing YARN or Map-Reduce terminology. Cohort is nice because people will have to use what it is and how to use it. The novelty is a feature, not a bug. Is the expectation that anyone who extends the grouping strategy need to add their own configs to the framework and wire them in? Depends on the implementation, but this could certainly be the case. Some implementations wouldn't require any extra configuration, others could have it hardwired in and some would need configuration-time definition. This is true with pretty much anything we have pluggable. Are there any other grouping strategies that might require a lot more change in the framework that just implementing this API? Not that I can think of and in fact this feature may be an important help in bringing SQL-like capabilities to Samza, as it allows very precise control over SSP-to-TI assignment which would be useful for join optimization, etc. Finally, unless we are very sure about this working out well, we should not make this a public API. I personally am, but I don't think that with a project this young we should be overly cautious about experimenting or trying to be more feature-rich. We're not yet 1.0 and are quite up-front that the framework is evolving. We've provided no API guarantees thus far.
          Hide
          jghoman Jakob Homan added a comment - - edited

          I'd either like to roll the C2P mapping into the Checkpoint object (and topic), or have a generic topic as Jakob Homan describes. I think having a generic topic is a big change that needs to be thought through, through (see 6, below). What else would we put in there, what would it be used for, etc.

          Aye, but the CheckpointManager interface is defined thusly:

          /**
           * Used as a standard interface for writing out checkpoints for a specified partition.
           */
          public interface CheckpointManager \{

          so if we stuff the task-state-partition-mapping into it, we're redefining the interface pretty dramatically and it's no longer really a CheckpointManager. It's also going to be a pretty ugly comingling of relatively orthogonal pieces of the code (checkpoint manager and state). Hence the suggestion of starting the job-state log with an easily extendable map interface while slowly transitioning away from the checkpoint log via KAFKA-1000 and AM-provided offsets.

          What exactly is the proposed logic for this? I think we should think through the impact of adding/removing/moving SSPs on state for every strategy.

          Assuming we have the currently defined cohort -> Set[SSP] mapping (current) and the last one loaded from the C2P/Job-State/Checkpoint/Whatever log (previous),

          • Calculate the complements between the previous set of cohorts and the current cohorts, logging about any new or missing cohorts. INFO seems reasonable to me, but WARN would work too.
          • For those cohorts in the intersection of current and previous, calculate the complements between the the current and previous Set[SSP]. Log about any new or missing SSPs.

          Again, this reporting functionality is a new level of visibility and safety we do not currently (and would have a circuitous route to) provide with the current configuration. Users right now can add/delete SSPs from the their task inputs (either manually or via something like the RegExTopicGenerator) and will have no warning.
          Logging rather than failing is the appropriate response as detailed in the answer to Sriram's question above (and in the design doc) about how different jobs respond to such changes. If a job needs heavier-handed enforcement, the VerifyingSSPGrouper would work great.

          We did this for the MessageChooser, and I think it's the right approach here as well.

          I don't think it was the right choice with MessageChooser, or here. First, you're only a young Incubator project once and it's better to experiment and be open as flexible while we can before a larger user base forces more conservative choices. Experiment while young. Second, nobody thus far has said this feature shouldn't be pluggable, just that it maybe shouldn't be pluggable yet. By the same token, if it turns out to be not fully baked, we can just make it, uhm, unpluggable.

          I like @sriram's naming suggestion:

          See above in my reply. I'm not married to the word cohort, but I am pretty strong about not overloading another term already used. Users are already confused about how partitioning works in Samza; there's no reason to make it worse. Cohort, as defined by Google, seems like the most applicable, not-yet-used term I could find:

          a group of people banded together or treated as a group.

          What exactly is the use case for GroupIntoNSets? If we decouple both checkpointing and state from the number of TaskInstances we have, why can't we just use GroupBySSP, and use the yarn.container.count to control the "N"?

          The use case is pretty much as Sriram describes - jobs that don't need any joining but have more SSPs than one might want to spawn a separate TI for each (for reasons of metrics, reporting, etc.). A 1:1 correspondence between container count and the sets would be reasonable, but it's a little less flexible. Also, this ties the SSPGrouper into a YARN dependency. Not a big point though.

          Per-Martin Kleppmann, we shouldn't use the word "state" again:

          OK, how about CircumstanceLog? PredicamentLog? FootingFile?

          Show
          jghoman Jakob Homan added a comment - - edited I'd either like to roll the C2P mapping into the Checkpoint object (and topic), or have a generic topic as Jakob Homan describes. I think having a generic topic is a big change that needs to be thought through, through (see 6, below). What else would we put in there, what would it be used for, etc. Aye, but the CheckpointManager interface is defined thusly: /** * Used as a standard interface for writing out checkpoints for a specified partition. */ public interface CheckpointManager \{ so if we stuff the task-state-partition-mapping into it, we're redefining the interface pretty dramatically and it's no longer really a CheckpointManager. It's also going to be a pretty ugly comingling of relatively orthogonal pieces of the code (checkpoint manager and state). Hence the suggestion of starting the job-state log with an easily extendable map interface while slowly transitioning away from the checkpoint log via KAFKA-1000 and AM-provided offsets. What exactly is the proposed logic for this? I think we should think through the impact of adding/removing/moving SSPs on state for every strategy. Assuming we have the currently defined cohort -> Set[SSP] mapping (current) and the last one loaded from the C2P/Job-State/Checkpoint/Whatever log (previous), Calculate the complements between the previous set of cohorts and the current cohorts, logging about any new or missing cohorts. INFO seems reasonable to me, but WARN would work too. For those cohorts in the intersection of current and previous, calculate the complements between the the current and previous Set[SSP]. Log about any new or missing SSPs. Again, this reporting functionality is a new level of visibility and safety we do not currently (and would have a circuitous route to) provide with the current configuration. Users right now can add/delete SSPs from the their task inputs (either manually or via something like the RegExTopicGenerator) and will have no warning. Logging rather than failing is the appropriate response as detailed in the answer to Sriram's question above (and in the design doc) about how different jobs respond to such changes. If a job needs heavier-handed enforcement, the VerifyingSSPGrouper would work great. We did this for the MessageChooser, and I think it's the right approach here as well. I don't think it was the right choice with MessageChooser, or here. First, you're only a young Incubator project once and it's better to experiment and be open as flexible while we can before a larger user base forces more conservative choices. Experiment while young. Second, nobody thus far has said this feature shouldn't be pluggable, just that it maybe shouldn't be pluggable yet. By the same token, if it turns out to be not fully baked, we can just make it, uhm, unpluggable. I like @sriram's naming suggestion: See above in my reply. I'm not married to the word cohort, but I am pretty strong about not overloading another term already used. Users are already confused about how partitioning works in Samza; there's no reason to make it worse. Cohort, as defined by Google, seems like the most applicable, not-yet-used term I could find: a group of people banded together or treated as a group. What exactly is the use case for GroupIntoNSets? If we decouple both checkpointing and state from the number of TaskInstances we have, why can't we just use GroupBySSP, and use the yarn.container.count to control the "N"? The use case is pretty much as Sriram describes - jobs that don't need any joining but have more SSPs than one might want to spawn a separate TI for each (for reasons of metrics, reporting, etc.). A 1:1 correspondence between container count and the sets would be reasonable, but it's a little less flexible. Also, this ties the SSPGrouper into a YARN dependency. Not a big point though. Per-Martin Kleppmann, we shouldn't use the word "state" again: OK, how about CircumstanceLog? PredicamentLog? FootingFile?
          Hide
          criccomini Chris Riccomini added a comment - - edited

          so if we stuff the task-state-partition-mapping into it, we're redefining the interface pretty dramatically and it's no longer really a CheckpointManager

          I agree. See my comment in (6) above. Perhaps we should rethink how we treat config, checkpoints, and SSP/cohort mappings? I think the main question I have is: if we had a generalized map topic where we keep config and cohorts, do we still need KAFKA-1000? Why split the "job state" between two places? Why not just put everything into one place (the topic)?

          I don't think it was the right choice with MessageChooser, or here. First, you're only a young Incubator project once and it's better to experiment and be open as flexible while we can before a larger user base forces more conservative choices.

          We disagree. This project is being used in production. I want to avoid backwards incompatibility as much as possible. The choice we made with MessageChooser saved us from having to re-write several jobs. The backwards-compatibility issue always seems very small when you make the change, and very big in a year's time.

          That said, I'm proposing in SAMZA-245 to change the API of SystemConsumer, which is backwards incompatible. The main difference between the two is that introducing the backwards compatibility in SAMZA-245 gets us a direct benefit (improved performance). Introducing the risk for incompatibility here seems unnecessary, since the initial feature set covers a huge swath of the use cases, no one is actively asking for it (except us), and we can always open it up later by simply adding a config.

          Second, nobody thus far has said this feature shouldn't be pluggable, just that it maybe shouldn't be pluggable yet.

          Yes, that's what I'm saying. Don't make it pluggable yet. Let's let it bake before we open it up.

          I'm not married to the word cohort, but I am pretty strong about not overloading another term already used.

          The most unanimous feedback on this JIRA is that no one likes cohort. I like taskName because that's what the string is. The fact that the task name is attached to a set of SSPs is just a very small part of what the name will be used for. It'll show up in all metrics, show up in logs, file directories, etc. Calling the thing a cohort in those scenarios makes no sense. The task is much more than a group of things banded together. It has state, offsets, logic, etc. Identifying it at a cohort doesn't make sense to me.

          OK, how about CircumstanceLog? PredicamentLog? FootingFile?

          SetupLog? It's essentially a list of changes required to setup a job before it starts, right? Not sure how I feel about the name, just spit-balling.

          Show
          criccomini Chris Riccomini added a comment - - edited so if we stuff the task-state-partition-mapping into it, we're redefining the interface pretty dramatically and it's no longer really a CheckpointManager I agree. See my comment in (6) above. Perhaps we should rethink how we treat config, checkpoints, and SSP/cohort mappings? I think the main question I have is: if we had a generalized map topic where we keep config and cohorts, do we still need KAFKA-1000 ? Why split the "job state" between two places? Why not just put everything into one place (the topic)? I don't think it was the right choice with MessageChooser, or here. First, you're only a young Incubator project once and it's better to experiment and be open as flexible while we can before a larger user base forces more conservative choices. We disagree. This project is being used in production. I want to avoid backwards incompatibility as much as possible. The choice we made with MessageChooser saved us from having to re-write several jobs. The backwards-compatibility issue always seems very small when you make the change, and very big in a year's time. That said, I'm proposing in SAMZA-245 to change the API of SystemConsumer, which is backwards incompatible. The main difference between the two is that introducing the backwards compatibility in SAMZA-245 gets us a direct benefit (improved performance). Introducing the risk for incompatibility here seems unnecessary, since the initial feature set covers a huge swath of the use cases, no one is actively asking for it (except us), and we can always open it up later by simply adding a config. Second, nobody thus far has said this feature shouldn't be pluggable, just that it maybe shouldn't be pluggable yet. Yes, that's what I'm saying. Don't make it pluggable yet. Let's let it bake before we open it up. I'm not married to the word cohort, but I am pretty strong about not overloading another term already used. The most unanimous feedback on this JIRA is that no one likes cohort. I like taskName because that's what the string is. The fact that the task name is attached to a set of SSPs is just a very small part of what the name will be used for. It'll show up in all metrics, show up in logs, file directories, etc. Calling the thing a cohort in those scenarios makes no sense. The task is much more than a group of things banded together. It has state, offsets, logic, etc. Identifying it at a cohort doesn't make sense to me. OK, how about CircumstanceLog? PredicamentLog? FootingFile? SetupLog? It's essentially a list of changes required to setup a job before it starts, right? Not sure how I feel about the name, just spit-balling.
          Hide
          martinkl Martin Kleppmann added a comment -

          I'm aware that adding another opinion here risks "design by committee" and just dragging things out further, so please feel free to ignore me. That said, FWIW in case it's useful:

          • Personally I think the term "cohort" is fine; I agree with Jakob about preferring a term that doesn't already have other meanings. If there is objection to the term "cohort" specifically, how about "shard"? I don't think that's used anywhere in Kafka or YARN. The word is unfortunately a bit tainted by MongoDB, but apart from that I think it has the right connotations.
          • I didn't see anyone pick up Jay Kreps's suggestion of storing the SSP/cohort mapping in Zookeeper, but I think it would be worth considering. Kafka already requires ZK, so it wouldn't be new infrastructure (just a new library dependency in the AM). ZK would likely be too expensive for frequently-changing things like checkpointed offsets, but probably ok for rarely-changing things like configuration and the SSP/cohort mapping.
          • No strong opinions about backwards compatibility at this stage.
          Show
          martinkl Martin Kleppmann added a comment - I'm aware that adding another opinion here risks "design by committee" and just dragging things out further, so please feel free to ignore me. That said, FWIW in case it's useful: Personally I think the term "cohort" is fine; I agree with Jakob about preferring a term that doesn't already have other meanings. If there is objection to the term "cohort" specifically, how about "shard"? I don't think that's used anywhere in Kafka or YARN. The word is unfortunately a bit tainted by MongoDB, but apart from that I think it has the right connotations. I didn't see anyone pick up Jay Kreps 's suggestion of storing the SSP/cohort mapping in Zookeeper, but I think it would be worth considering. Kafka already requires ZK, so it wouldn't be new infrastructure (just a new library dependency in the AM). ZK would likely be too expensive for frequently-changing things like checkpointed offsets, but probably ok for rarely-changing things like configuration and the SSP/cohort mapping. No strong opinions about backwards compatibility at this stage.
          Hide
          criccomini Chris Riccomini added a comment -

          I didn't see anyone pick up Jay Kreps's suggestion of storing the SSP/cohort mapping in Zookeeper, but I think it would be worth considering.

          For me, this falls into the same category as the KAFKA-1000 comment I had, above. If we introduce this SetupLog concept for the job, why not just use it for everything (SSP/Cohort assignment, checkpoints, and job config)? Using ZK for SSPs/Cohort mapping, and KAFKA-1000 for checkpoints seems more complicated than just having a single way to handle all of these use cases.

          SetupLog? It's essentially a list of changes required to setup a job before it starts, right? Not sure how I feel about the name, just spit-balling.

          Yes, commenting on my own comment. What about ConfigLog?

          Show
          criccomini Chris Riccomini added a comment - I didn't see anyone pick up Jay Kreps's suggestion of storing the SSP/cohort mapping in Zookeeper, but I think it would be worth considering. For me, this falls into the same category as the KAFKA-1000 comment I had, above. If we introduce this SetupLog concept for the job, why not just use it for everything (SSP/Cohort assignment, checkpoints, and job config)? Using ZK for SSPs/Cohort mapping, and KAFKA-1000 for checkpoints seems more complicated than just having a single way to handle all of these use cases. SetupLog? It's essentially a list of changes required to setup a job before it starts, right? Not sure how I feel about the name, just spit-balling. Yes, commenting on my own comment. What about ConfigLog?
          Hide
          garryturk Garry Turkington added a comment -

          Also late to the party here but a few thoughts:

          I'm with Martin/Jakob on having a liking for cohort. The reason is that I feel the other proposed names just say "this is a task/shard/whatever". But the creation of the thing has a degree of logic being applied and whatever term is used I think it's useful to help reinforce that the nature of how those SSPs were pulled together was far from arbitrary. And sorry Martin but I think shard is completely polluted by its past in the database world.

          Pluggability: I agree it's important to experiment in youth but I also fear that if this ability is exposed that it will become something that new users of Samza start playing with and end up getting very confused or otherwise disillusioned with the system. I also think that the strategies proposed to be delivered within this Jira will cover the vast vast majority of cases and its not obvious to me just how beneficial having this as a public pluggable capability would be.

          I was also interested in the idea from Jay re using ZK but I get Chris' point on multiple solutions. Maybe its conceptual but I feel a bit odd about pushing increasing layers of config and state into Kafka (or indeed any other supported system). And on that point is a reliance on KAFKA-1000 not going to add more complexity to other SystemProducer implementations?

          Show
          garryturk Garry Turkington added a comment - Also late to the party here but a few thoughts: I'm with Martin/Jakob on having a liking for cohort. The reason is that I feel the other proposed names just say "this is a task/shard/whatever". But the creation of the thing has a degree of logic being applied and whatever term is used I think it's useful to help reinforce that the nature of how those SSPs were pulled together was far from arbitrary. And sorry Martin but I think shard is completely polluted by its past in the database world. Pluggability: I agree it's important to experiment in youth but I also fear that if this ability is exposed that it will become something that new users of Samza start playing with and end up getting very confused or otherwise disillusioned with the system. I also think that the strategies proposed to be delivered within this Jira will cover the vast vast majority of cases and its not obvious to me just how beneficial having this as a public pluggable capability would be. I was also interested in the idea from Jay re using ZK but I get Chris' point on multiple solutions. Maybe its conceptual but I feel a bit odd about pushing increasing layers of config and state into Kafka (or indeed any other supported system). And on that point is a reliance on KAFKA-1000 not going to add more complexity to other SystemProducer implementations?
          Hide
          jghoman Jakob Homan added a comment -

          First, a couple meta notes with my Champion hat on, since this is Samza's first reasonably sized code debate and we have lots of new-to-ASF community members:

          • This type of involved, heavily quoted, polite but emphatic discussion is healthy and encouraged. Everyone should feel free to get involved; don't be intimidated by the length of the discussion thus far.
          • It may take a while, but we'll reach consensus on whatever the implementation is. These types of decisions need to be consensus rather than majority-vote to ensure there are no losing sides (or any sides, really). Consensus means everyone can live with the decision (not that they all love it though), whereas voting means there would be a group who got shut out entirely, and that's not healthy for the community.

          Now, I think there are four broad groups of issues in play in this JIRA:

          1. Those we agree on, mainly that this is a useful feature to have the broad approach as detailed in the design doc is the correct one for the moment.
          2. Those that questions have been raised about and have been answered, such as those raised by Jay Kreps, Sriram Subramanian and Chris Riccomini. Guys, are you ok with answers provided above, except those still under discussion as described below?
          3. Those that have been modified as part of the discussion (and I'll update the design doc with the new description):
            • GroupIntoNSets is confusing/not very useful as defined. Chris had a good idea to set the N there to be the same as the number of containers. This is likely its most common use case. This will move the code into some package that's aware of YARN since it requires a YARN-specific config (or we need to change the yarn container count config to be more generic, but that's out of scope here).
            • The bookkeeping necessary to support state and these new features should be kept away from the state log itself and moved into some central location with other per-job info (whatever form that takes)
          4. Finally, those questions on which we still do not have consensus:
            • Terminology: cohort. Looks like the participants thus far are numerically even, with those leaning against cohort more vehement than those for. I'm not wild about shard, as I think that's a pretty key term for databases and may lead people to think we this grouping functions the same way. Task Name has a pretty big flaw it conflates these tasks with map-reduce tasks, when in fact our closest analog is the Samza container. Since Samza is pitched as map-reduce for streams, it's worth keeping our kinda-the-same-but-not-really concepts as separate as possible lexically from map-reduce's.
            • Pluggable: yea or nay. This feature dramatically increases the power of the framework. It's a reasonable generalization of a previously hard-code assumption about how to group inputs together. The provided interface is clean and gives a Config for supporting per-implementation-values. It manipulates bedrock classes and concepts (SSPs and tasks/TaskInstances), which are very unlikely to change as Samza progresses. Having been heavily involved in creating, maintaining, deprecating and codifying APIs in Hadoop, which had quite a few production uses at the time, and in hashing out new APIs in ASF podlings, I'd strongly recommend a policy of openness now and clamp that down as the project grows (particularly after a 1.0 release). Further, Samza is already pluggable in all its significant functionality - serdes, state stores, lifecycle listeners, checkpoint managers, metrics, stream job type, etc - that keeping this functionality closed is big omission. The closest Hadoop analog class is the Partitioner, which plays a key role in making that framework extensible and useful. Samza's version should similarly be open and pluggable. Making it so now allows us to advertise the fact and invite people to play with it - where they can find and report its limitations if they exist - rather than bury it in the code. Essentially, it's a chicken-egg thing - people won't use this feature and find its limitations/build cooler things with Samza, if it's not a feature we publicize and make available.
            • Where to store the extra info needed for this feature. Non-Kafka-log based solutions like ZK may be a good idea but a huge change that would blow up the size of the patch and ignite lots more good discussion. Creating a general purpose log (ConfigLog sounds good to me; it would hold the total config necessary for any SC to be started and would be written by the AM and read by each SC on startup) sounds like a good start. Immediate subsequent JIRAs can determine the fate of the checkpoint log, what else should go into the ConfigLog, etc. This approach seems like the smallest necessary change to implement the current JIRA and doesn't expose anything that can't be changed in the future, absent more discussion.
          Show
          jghoman Jakob Homan added a comment - First, a couple meta notes with my Champion hat on, since this is Samza's first reasonably sized code debate and we have lots of new-to-ASF community members: This type of involved, heavily quoted, polite but emphatic discussion is healthy and encouraged. Everyone should feel free to get involved; don't be intimidated by the length of the discussion thus far. It may take a while, but we'll reach consensus on whatever the implementation is. These types of decisions need to be consensus rather than majority-vote to ensure there are no losing sides (or any sides, really). Consensus means everyone can live with the decision (not that they all love it though), whereas voting means there would be a group who got shut out entirely, and that's not healthy for the community. Now, I think there are four broad groups of issues in play in this JIRA: Those we agree on, mainly that this is a useful feature to have the broad approach as detailed in the design doc is the correct one for the moment. Those that questions have been raised about and have been answered, such as those raised by Jay Kreps , Sriram Subramanian and Chris Riccomini . Guys, are you ok with answers provided above, except those still under discussion as described below? Those that have been modified as part of the discussion (and I'll update the design doc with the new description): GroupIntoNSets is confusing/not very useful as defined. Chris had a good idea to set the N there to be the same as the number of containers. This is likely its most common use case. This will move the code into some package that's aware of YARN since it requires a YARN-specific config (or we need to change the yarn container count config to be more generic, but that's out of scope here). The bookkeeping necessary to support state and these new features should be kept away from the state log itself and moved into some central location with other per-job info (whatever form that takes) Finally, those questions on which we still do not have consensus: Terminology: cohort. Looks like the participants thus far are numerically even, with those leaning against cohort more vehement than those for. I'm not wild about shard, as I think that's a pretty key term for databases and may lead people to think we this grouping functions the same way. Task Name has a pretty big flaw it conflates these tasks with map-reduce tasks, when in fact our closest analog is the Samza container. Since Samza is pitched as map-reduce for streams, it's worth keeping our kinda-the-same-but-not-really concepts as separate as possible lexically from map-reduce's. Pluggable: yea or nay. This feature dramatically increases the power of the framework. It's a reasonable generalization of a previously hard-code assumption about how to group inputs together. The provided interface is clean and gives a Config for supporting per-implementation-values. It manipulates bedrock classes and concepts (SSPs and tasks/TaskInstances), which are very unlikely to change as Samza progresses. Having been heavily involved in creating, maintaining, deprecating and codifying APIs in Hadoop, which had quite a few production uses at the time, and in hashing out new APIs in ASF podlings , I'd strongly recommend a policy of openness now and clamp that down as the project grows (particularly after a 1.0 release). Further, Samza is already pluggable in all its significant functionality - serdes, state stores, lifecycle listeners, checkpoint managers, metrics, stream job type, etc - that keeping this functionality closed is big omission. The closest Hadoop analog class is the Partitioner , which plays a key role in making that framework extensible and useful. Samza's version should similarly be open and pluggable. Making it so now allows us to advertise the fact and invite people to play with it - where they can find and report its limitations if they exist - rather than bury it in the code. Essentially, it's a chicken-egg thing - people won't use this feature and find its limitations/build cooler things with Samza, if it's not a feature we publicize and make available. Where to store the extra info needed for this feature. Non-Kafka-log based solutions like ZK may be a good idea but a huge change that would blow up the size of the patch and ignite lots more good discussion. Creating a general purpose log (ConfigLog sounds good to me; it would hold the total config necessary for any SC to be started and would be written by the AM and read by each SC on startup) sounds like a good start. Immediate subsequent JIRAs can determine the fate of the checkpoint log, what else should go into the ConfigLog, etc. This approach seems like the smallest necessary change to implement the current JIRA and doesn't expose anything that can't be changed in the future, absent more discussion.
          Hide
          criccomini Chris Riccomini added a comment - - edited

          Guys, are you ok with answers provided above, except those still under discussion as described below?

          Yep, I think so.

          GroupIntoNSets is confusing/not very useful as defined. Chris had a good idea to set the N there to be the same as the number of containers. This is likely its most common use case. This will move the code into some package that's aware of YARN since it requires a YARN-specific config (or we need to change the yarn container count config to be more generic, but that's out of scope here).

          I still feel like the strategy is just a bit weird. As I understand it, the reason that it exists is just to reduce the number of TIs (vs. just having one TI per-partition, and spreading the TIs evenly amongst containers). TIs are cheap, though. The only issue we've seen (that I can recall) with having a lot of TIs is that this can cause an explosion of the number of metrics that are reported. In the case of the JmxReporter and MetricsSnapshotReporter, there isn't any problem with this. Some other systems can have problems with the number of metrics being reported, but I think there are better solutions to that problem than introducing a grouping strategy to reduce the number of TIs. Better solutions to this problem would be to filter (either with a debug-level, or a black list) the number of metrics, or just use metrics infrastructure that can handle 10k-100k metrics/job. I don't think we need this strategy.

          The bookkeeping necessary to support state and these new features should be kept away from the state log itself and moved into some central location with other per-job info (whatever form that takes)

          Agree.

          I'm not wild about shard, as I think that's a pretty key term for databases and may lead people to think we this grouping functions the same way.

          Agree.

          Task Name has a pretty big flaw it conflates these tasks with map-reduce tasks, when in fact our closest analog is the Samza container.

          The issue I take with adding any new word is that we already have the word "task" deeply ingrained in Samza (StreamTask, TaskInstance, TaskLifecycleListener, TaskContext, TaskCoordinator, TaskStorageManager, etc). Some of these are public user-facing highly visible APIs. The fact is, we have a task, and it means a certain thing. It's documented. Talks have been given on it. Adding a new word to describe a task, while still keeping the word" task" everywhere else, just makes things worse, not better.

          If we want to open up a separate discussion about refactoring everything to change the task stuff to something else, we can discuss that, but I see it as an orthogonal discussion to this issue. Simply adding "cohort" for one thing in this ticket, which diverges from the entire rest of the code-base, seems wrong to me. I remain pretty strongly convinced that taskName is in keeping with the rest of the code base as it exists today.

          Essentially, it's a chicken-egg thing - people won't use this feature and find its limitations/build cooler things with Samza, if it's not a feature we publicize and make available.

          I can live with this. (edit: by "I can live with this," I mean I can live with making the feature pluggable ).

          This approach seems like the smallest necessary change to implement the current JIRA and doesn't expose anything that can't be changed in the future, absent more discussion.

          Broadly, I agree with you. I think the ConfigLog is totally something we should explore in a separate ticket to figure out what stuff should be added to it, how it would work, etc. My main concern here is about the sequencing of things. The smallest necessary change to implement the current JIRA is actually just to add the information to the existing checkpoint.

          Here's my concern with adding the ConfigLog as part of this JIRA: we don't think the implications of the ConfigLog all the way through because we want to limit the scope of this JIRA and finish it faster (with fewer code changes/less risk). As a result, we now have two topics: ConfigLog and the checkpoint topic. Then, we open up a second ConfigLog JIRA, and discover some flaw with it, or decide we want a different design. Now we have two topics to migrate (checkpoint log, and config log), as well as two different chunks of code to update/delete, and three different versions of a job (one with just checkpoints, one with checkpoint and config log from this jira, and one with checkpoint and config log from new jira). This is complicated.

          This concern leads me to want to just put the cohort/SSP and state mapping stuff into the existing Checkpoint class and use our existing CheckpointManager. As a follow on JIRA, we can do a nice detailed design doc on the ConfigLog, and think through things in more detail. Since we'll have to migrate the checkpoint no matter what, sticking the cohort/SSP stuff into the checkpoint shouldn't introduce any additional work, and not committing to a ConfigLog now will mean we'll feel more comfortable thinking openly about how exactly it should be implemented (rather than being tied to the implementation that falls out of this JIRA, which might or might not be the best approach when we dig into it in more detail).

          Show
          criccomini Chris Riccomini added a comment - - edited Guys, are you ok with answers provided above, except those still under discussion as described below? Yep, I think so. GroupIntoNSets is confusing/not very useful as defined. Chris had a good idea to set the N there to be the same as the number of containers. This is likely its most common use case. This will move the code into some package that's aware of YARN since it requires a YARN-specific config (or we need to change the yarn container count config to be more generic, but that's out of scope here). I still feel like the strategy is just a bit weird. As I understand it, the reason that it exists is just to reduce the number of TIs (vs. just having one TI per-partition, and spreading the TIs evenly amongst containers). TIs are cheap, though. The only issue we've seen (that I can recall) with having a lot of TIs is that this can cause an explosion of the number of metrics that are reported. In the case of the JmxReporter and MetricsSnapshotReporter, there isn't any problem with this. Some other systems can have problems with the number of metrics being reported, but I think there are better solutions to that problem than introducing a grouping strategy to reduce the number of TIs. Better solutions to this problem would be to filter (either with a debug-level, or a black list) the number of metrics, or just use metrics infrastructure that can handle 10k-100k metrics/job. I don't think we need this strategy. The bookkeeping necessary to support state and these new features should be kept away from the state log itself and moved into some central location with other per-job info (whatever form that takes) Agree. I'm not wild about shard, as I think that's a pretty key term for databases and may lead people to think we this grouping functions the same way. Agree. Task Name has a pretty big flaw it conflates these tasks with map-reduce tasks, when in fact our closest analog is the Samza container. The issue I take with adding any new word is that we already have the word "task" deeply ingrained in Samza (StreamTask, TaskInstance, TaskLifecycleListener, TaskContext, TaskCoordinator, TaskStorageManager, etc). Some of these are public user-facing highly visible APIs. The fact is, we have a task, and it means a certain thing. It's documented. Talks have been given on it. Adding a new word to describe a task, while still keeping the word" task" everywhere else, just makes things worse, not better. If we want to open up a separate discussion about refactoring everything to change the task stuff to something else, we can discuss that, but I see it as an orthogonal discussion to this issue. Simply adding "cohort" for one thing in this ticket, which diverges from the entire rest of the code-base, seems wrong to me. I remain pretty strongly convinced that taskName is in keeping with the rest of the code base as it exists today. Essentially, it's a chicken-egg thing - people won't use this feature and find its limitations/build cooler things with Samza, if it's not a feature we publicize and make available. I can live with this. (edit: by "I can live with this," I mean I can live with making the feature pluggable ). This approach seems like the smallest necessary change to implement the current JIRA and doesn't expose anything that can't be changed in the future, absent more discussion. Broadly, I agree with you. I think the ConfigLog is totally something we should explore in a separate ticket to figure out what stuff should be added to it, how it would work, etc. My main concern here is about the sequencing of things. The smallest necessary change to implement the current JIRA is actually just to add the information to the existing checkpoint. Here's my concern with adding the ConfigLog as part of this JIRA: we don't think the implications of the ConfigLog all the way through because we want to limit the scope of this JIRA and finish it faster (with fewer code changes/less risk). As a result, we now have two topics: ConfigLog and the checkpoint topic. Then, we open up a second ConfigLog JIRA, and discover some flaw with it, or decide we want a different design. Now we have two topics to migrate (checkpoint log, and config log), as well as two different chunks of code to update/delete, and three different versions of a job (one with just checkpoints, one with checkpoint and config log from this jira, and one with checkpoint and config log from new jira). This is complicated. This concern leads me to want to just put the cohort/SSP and state mapping stuff into the existing Checkpoint class and use our existing CheckpointManager. As a follow on JIRA, we can do a nice detailed design doc on the ConfigLog, and think through things in more detail. Since we'll have to migrate the checkpoint no matter what, sticking the cohort/SSP stuff into the checkpoint shouldn't introduce any additional work, and not committing to a ConfigLog now will mean we'll feel more comfortable thinking openly about how exactly it should be implemented (rather than being tied to the implementation that falls out of this JIRA, which might or might not be the best approach when we dig into it in more detail).
          Hide
          jkreps Jay Kreps added a comment -

          On the topic of cohort vs task id/name, let me try to give my rationale for what is clearly a somewhat subjective matter.

          Task is a well defined thing in Samza. I agree that various systems have different notions of tasks and that could be confusing, but that is the terminology we have. A lot of thought actually went into this approach because we first did it wrong (at first we conflated tasks and containers, coming from a MapReduce background ourselves) then realized all the flaws and redid it to what it is.

          I think some of the disagreement may be more around the model than the terminology.

          Let me give a quick background explanation for why we separate tasks and containers since there are lots of people on this thread. Essentially what we realized is that you need both a unit of "logical execution" and "physical parallelism" for a job that runs forever. A batch system can conflate these because if you want to restart a running job with more resources it starts over from the very beginning, throwing away whatever intermediate results the tasks had accumulated. A stream processing job, though, never stops, and so if you allow any kind of state (whether backed by our changelogs or anything) and you want to be able to guarantee any kind of correctness of semantics you need your units of processing to not change. Since you obviously need to be able to scale up and down the parallelism of processing (how may cpus/processes you get) you really want to separate the two concepts.

          So we can call this unit of logical execution any number of things. It could be a task or a stream processing element, or a processor, or whatever. But if that unit is called an X, then we should really consider calling the unique identifier for that unit an X id, or, if we must name.

          I actually like the phrase cohort, the issue is just that we aren't using that elsewhere to denote what we currently call a task.

          Show
          jkreps Jay Kreps added a comment - On the topic of cohort vs task id/name, let me try to give my rationale for what is clearly a somewhat subjective matter. Task is a well defined thing in Samza. I agree that various systems have different notions of tasks and that could be confusing, but that is the terminology we have. A lot of thought actually went into this approach because we first did it wrong (at first we conflated tasks and containers, coming from a MapReduce background ourselves) then realized all the flaws and redid it to what it is. I think some of the disagreement may be more around the model than the terminology. Let me give a quick background explanation for why we separate tasks and containers since there are lots of people on this thread. Essentially what we realized is that you need both a unit of "logical execution" and "physical parallelism" for a job that runs forever. A batch system can conflate these because if you want to restart a running job with more resources it starts over from the very beginning, throwing away whatever intermediate results the tasks had accumulated. A stream processing job, though, never stops, and so if you allow any kind of state (whether backed by our changelogs or anything) and you want to be able to guarantee any kind of correctness of semantics you need your units of processing to not change. Since you obviously need to be able to scale up and down the parallelism of processing (how may cpus/processes you get) you really want to separate the two concepts. So we can call this unit of logical execution any number of things. It could be a task or a stream processing element, or a processor, or whatever. But if that unit is called an X, then we should really consider calling the unique identifier for that unit an X id, or, if we must name. I actually like the phrase cohort, the issue is just that we aren't using that elsewhere to denote what we currently call a task.
          Hide
          jkreps Jay Kreps added a comment -

          Let me also try to explain my objection to the groupByN strategy or any strategy that mutates the set of tasks.

          The model I think we should be able to give is a state machine model. I.e. input comes in a deterministic manner to a deterministic task. In cases where we have multiple inputs we just need to stitch them together in a deterministic or repeatable manner to make this work.

          The advantages of this model are
          1. Allows the maintenance of state along with each task.
          2. Allows a fault tolerance model where we eliminate duplicate output from the task to provide exact semantics.

          If the identity of our tasks change in the wrong way, the right way to split up state (or whether that can be done at all) somewhat disappears.

          Likewise the ability to suppress duplicate output is based on a notion of identity for the task that will give wrong answers if the inputs to that task are dramatically changed.

          I think these are two really core and important features so I just really really want to make sure we think through the implications of any change here holistically.

          What this raises though is something that we haven't really addressed head on in the current model, which is what kinds of partition assignment changes are actually okay?

          Partition assignment changes can happen in only two cases: when a topic is added or deleted.

          Interesting these cases both seem to work out okay. The semantics of adding a topic are that you add those partitions to the appropriate tasks and they start processing from the beginning. It is essentially the same as if they had always been assigned those partitions but data was extremely delayed in arrival. The delete case is also okay I think. In this case the effect is as if data just stops arriving from that topic.

          What will certainly break things would be a case where a partition moves from one task to another or the total set of tasks changes. I actually really think the right thing to do is just disallow those kinds of changes. I agree that sometimes you may be able to reason that it is okay, but I think it is more dangerous than useful.

          The GroupByN strategy does fix a real problem at LinkedIn for large jobs where we records metrics at the task level. But this problem is so specific to our environment that I really really would rather not bake that constraint into the system (the LinkedIn ingraphs people are actually trying to fix this anyway so the problem may disappear in the next few months).

          Do we have other ideas for grouping strategies other than GroupBySSP and GroupByPartition? If we were able to otherwise scale the task count I think these are the only two semantically different cases I can think of. I would be most comfortable with an implementation that just exposed something like

            co.partition.streams=true
          

          although I agree that is more limited.

          Show
          jkreps Jay Kreps added a comment - Let me also try to explain my objection to the groupByN strategy or any strategy that mutates the set of tasks. The model I think we should be able to give is a state machine model. I.e. input comes in a deterministic manner to a deterministic task. In cases where we have multiple inputs we just need to stitch them together in a deterministic or repeatable manner to make this work. The advantages of this model are 1. Allows the maintenance of state along with each task. 2. Allows a fault tolerance model where we eliminate duplicate output from the task to provide exact semantics. If the identity of our tasks change in the wrong way, the right way to split up state (or whether that can be done at all) somewhat disappears. Likewise the ability to suppress duplicate output is based on a notion of identity for the task that will give wrong answers if the inputs to that task are dramatically changed. I think these are two really core and important features so I just really really want to make sure we think through the implications of any change here holistically. What this raises though is something that we haven't really addressed head on in the current model, which is what kinds of partition assignment changes are actually okay? Partition assignment changes can happen in only two cases: when a topic is added or deleted. Interesting these cases both seem to work out okay. The semantics of adding a topic are that you add those partitions to the appropriate tasks and they start processing from the beginning. It is essentially the same as if they had always been assigned those partitions but data was extremely delayed in arrival. The delete case is also okay I think. In this case the effect is as if data just stops arriving from that topic. What will certainly break things would be a case where a partition moves from one task to another or the total set of tasks changes. I actually really think the right thing to do is just disallow those kinds of changes. I agree that sometimes you may be able to reason that it is okay, but I think it is more dangerous than useful. The GroupByN strategy does fix a real problem at LinkedIn for large jobs where we records metrics at the task level. But this problem is so specific to our environment that I really really would rather not bake that constraint into the system (the LinkedIn ingraphs people are actually trying to fix this anyway so the problem may disappear in the next few months). Do we have other ideas for grouping strategies other than GroupBySSP and GroupByPartition? If we were able to otherwise scale the task count I think these are the only two semantically different cases I can think of. I would be most comfortable with an implementation that just exposed something like co.partition.streams= true although I agree that is more limited.
          Hide
          jkreps Jay Kreps added a comment -

          So to "checkpoint" my comments:

          • I am violently pro task id/name
          • I see the possible value of being able to use semantics of the topic name to override the grouping as in the datacenter example, so I don't really object to making the grouping pluggable.
          • My real concern is around safety and semantics if the set of tasks change. If we introduce options that, if set incorrectly, corrupt your output I think people will tend to hurt themselves (and then we will spend a ton of time trying to debug). So I would like to think through the strongest validations we can do and how those would work. I think validation should be part of this feature, though they need not be part of this patch.
          • I agree that we need to think through the checkpointing/config log stuff.
          Show
          jkreps Jay Kreps added a comment - So to "checkpoint" my comments: I am violently pro task id/name I see the possible value of being able to use semantics of the topic name to override the grouping as in the datacenter example, so I don't really object to making the grouping pluggable. My real concern is around safety and semantics if the set of tasks change. If we introduce options that, if set incorrectly, corrupt your output I think people will tend to hurt themselves (and then we will spend a ton of time trying to debug). So I would like to think through the strongest validations we can do and how those would work. I think validation should be part of this feature, though they need not be part of this patch. I agree that we need to think through the checkpointing/config log stuff.
          Hide
          sriramsub Sriram Subramanian added a comment -

          1. Naming. I understand that your naming was focussed on the partition grouping rather than who owns the partitions. If the grouping could be freely moved between tasks then I agree it should have a different name to avoid confusion. However, this is not possible. The task does represent the grouping of partitions and hence it is better for them to be tightly coupled. Task id / name seems to be more appropriate to indicate that.
          2. I am still not all in with making the grouping api public. The main reason is that it encourages anyone to write a grouping implementation that reassigns partitions to tasks and the behavior is undefined. We can definitely catch these cases and warn or error but I think that is not a great user experience. However, I am willing to experiment here since there seems to be a consensus on this.

          Show
          sriramsub Sriram Subramanian added a comment - 1. Naming. I understand that your naming was focussed on the partition grouping rather than who owns the partitions. If the grouping could be freely moved between tasks then I agree it should have a different name to avoid confusion. However, this is not possible. The task does represent the grouping of partitions and hence it is better for them to be tightly coupled. Task id / name seems to be more appropriate to indicate that. 2. I am still not all in with making the grouping api public. The main reason is that it encourages anyone to write a grouping implementation that reassigns partitions to tasks and the behavior is undefined. We can definitely catch these cases and warn or error but I think that is not a great user experience. However, I am willing to experiment here since there seems to be a consensus on this.
          Hide
          jghoman Jakob Homan added a comment -

          OK, I think we're good to go.

          • Cohort: I still believe there's value in this term, but it's not worth holding the entire JIRA up over. Task name seems to be acceptable to most. Let's go with that.
          • Pluggable: We've reached consensus on this. The interface will be public and pluggable.
          • IntoNSets: Since the interface is pluggable, it'll work to hold off on including this strategy in the current patch and open a new JIRA to add it. That way if users want it, they'll have an easy place to register their interest and we can commit it quickly.
          • ConfigLog vs Checkpoint Log: The general agreement is that the checkpoint log will shortly undergo significant changes (via KAFKA-1000). Additionally, we're pretty clear that we're going to need some type of more general log that may or may not include some of the checkpoint log's current functionality. Let's move the whole discussion of what that config log will look like into another JIRA and stash the state-log related info necessary this JIRA into the checkpoint log (this is assuming it's easy to tie the state log into the checkpoint log in a not overly ugly way).
          Show
          jghoman Jakob Homan added a comment - OK, I think we're good to go. Cohort: I still believe there's value in this term, but it's not worth holding the entire JIRA up over. Task name seems to be acceptable to most. Let's go with that. Pluggable: We've reached consensus on this. The interface will be public and pluggable. IntoNSets: Since the interface is pluggable, it'll work to hold off on including this strategy in the current patch and open a new JIRA to add it. That way if users want it, they'll have an easy place to register their interest and we can commit it quickly. ConfigLog vs Checkpoint Log: The general agreement is that the checkpoint log will shortly undergo significant changes (via KAFKA-1000 ). Additionally, we're pretty clear that we're going to need some type of more general log that may or may not include some of the checkpoint log's current functionality. Let's move the whole discussion of what that config log will look like into another JIRA and stash the state-log related info necessary this JIRA into the checkpoint log (this is assuming it's easy to tie the state log into the checkpoint log in a not overly ugly way).
          Hide
          martinkl Martin Kleppmann added a comment -

          Sounds good. Thank you everyone for explaining your reasoning so clearly. It's really useful to have the thinking behind Samza's design decisions articulated and recorded for future reference.

          Show
          martinkl Martin Kleppmann added a comment - Sounds good. Thank you everyone for explaining your reasoning so clearly. It's really useful to have the thinking behind Samza's design decisions articulated and recorded for future reference.
          Hide
          jghoman Jakob Homan added a comment -

          Attaching draft patch.

          • Having lots of trouble getting Scala to like my TaskNameToSSPs map, so may need to jettison that even though it makes the code much cleaner.
          • Working on merging in SAMZA-253, which is a big change on top of this. Will upload a merged in version by EOD, hopefully.
          • All tests pass except TestStatefulTask, which I believe is connected to the Scala map problems.
          • Holding off on re-writing the Offset tool until the overall approach has been checked.
          Show
          jghoman Jakob Homan added a comment - Attaching draft patch. Having lots of trouble getting Scala to like my TaskNameToSSPs map, so may need to jettison that even though it makes the code much cleaner. Working on merging in SAMZA-253 , which is a big change on top of this. Will upload a merged in version by EOD, hopefully. All tests pass except TestStatefulTask, which I believe is connected to the Scala map problems. Holding off on re-writing the Offset tool until the overall approach has been checked.
          Hide
          criccomini Chris Riccomini added a comment -

          Any chance you could open an RB up for this?

          Show
          criccomini Chris Riccomini added a comment - Any chance you could open an RB up for this?
          Hide
          criccomini Chris Riccomini added a comment -

          Also, the -draft patch doesn't apply to trunk. I'm assuming it's because of SAMZA-253. Do you want me to wait for the rebase before reviewing?

          Show
          criccomini Chris Riccomini added a comment - Also, the -draft patch doesn't apply to trunk. I'm assuming it's because of SAMZA-253 . Do you want me to wait for the rebase before reviewing?
          Hide
          jghoman Jakob Homan added a comment -

          Attaching patch rebased off master. Still has GroupIntoNSets just to show how it would work, but all tests pass. Still need to re-write the offsets tool after we get further into the review. Not crazy about how TaskNameToSSPs came out; implementing a Map in Scala is insanely annoying. May be better to just go with straight map (possibly type aliased) and helper methods.

          Show
          jghoman Jakob Homan added a comment - Attaching patch rebased off master. Still has GroupIntoNSets just to show how it would work, but all tests pass. Still need to re-write the offsets tool after we get further into the review. Not crazy about how TaskNameToSSPs came out; implementing a Map in Scala is insanely annoying. May be better to just go with straight map (possibly type aliased) and helper methods.
          Hide
          jghoman Jakob Homan added a comment -
          Show
          jghoman Jakob Homan added a comment - rb: https://reviews.apache.org/r/22215/
          Hide
          jghoman Jakob Homan added a comment -

          Patch drifted from mater branch. Rebased.

          Show
          jghoman Jakob Homan added a comment - Patch drifted from mater branch. Rebased.
          Hide
          martinkl Martin Kleppmann added a comment -

          I just had a thought: am I right in thinking that this mechanism can also be used to implement "broadcast streams", i.e. a stream for which all partitions are consumed by all task instances in a job? (Previously discussed here, for example.) It seems that this would simply require a SSPGrouper which puts the SSPs for the broadcast stream in the set of SSPs for each of the task instances. Is that right?

          Show
          martinkl Martin Kleppmann added a comment - I just had a thought: am I right in thinking that this mechanism can also be used to implement "broadcast streams", i.e. a stream for which all partitions are consumed by all task instances in a job? (Previously discussed here , for example.) It seems that this would simply require a SSPGrouper which puts the SSPs for the broadcast stream in the set of SSPs for each of the task instances. Is that right?
          Hide
          jghoman Jakob Homan added a comment -

          Correct. The SSPGrouper could even be a wrapper around some other grouper that appends the SSP to all the groups after the wrapped one has done its thing.

          Show
          jghoman Jakob Homan added a comment - Correct. The SSPGrouper could even be a wrapper around some other grouper that appends the SSP to all the groups after the wrapped one has done its thing.
          Hide
          jghoman Jakob Homan added a comment -

          Uploading latest patch post rb comments. Will updated rb as well.

          Show
          jghoman Jakob Homan added a comment - Uploading latest patch post rb comments. Will updated rb as well.
          Hide
          jghoman Jakob Homan added a comment -

          Small update to last patch to fix how partition table from AM changes that went in previously.

          Show
          jghoman Jakob Homan added a comment - Small update to last patch to fix how partition table from AM changes that went in previously.
          Hide
          jghoman Jakob Homan added a comment -

          Latest patch from Chris' review.

          Show
          jghoman Jakob Homan added a comment - Latest patch from Chris' review.
          Hide
          jghoman Jakob Homan added a comment -

          Latest revision based on Chris' feedback.

          Show
          jghoman Jakob Homan added a comment - Latest revision based on Chris' feedback.
          Hide
          criccomini Chris Riccomini added a comment -

          Reviewed the latest patch. No further comments at this time.

          Show
          criccomini Chris Riccomini added a comment - Reviewed the latest patch. No further comments at this time.
          Hide
          martinkl Martin Kleppmann added a comment -

          The latest patch doesn't quite cleanly apply on master. Other than that, I am ok with committing this. There are some remaining issues, but we can address them in follow-on tickets:

          • Backwards-compatibility or conversion of checkpoints (allow Samza 0.8 to read Samza 0.7 checkpoint)?
          • CheckpointTool is broken and needs to be rewritten
          • Update docs on the website to describe the new world order and new configuration properties
          • If the same SSP is assigned to several TaskInstances, messages from that SSP should be delivered to all matching TaskInstances, not just one
          Show
          martinkl Martin Kleppmann added a comment - The latest patch doesn't quite cleanly apply on master. Other than that, I am ok with committing this. There are some remaining issues, but we can address them in follow-on tickets: Backwards-compatibility or conversion of checkpoints (allow Samza 0.8 to read Samza 0.7 checkpoint)? CheckpointTool is broken and needs to be rewritten Update docs on the website to describe the new world order and new configuration properties If the same SSP is assigned to several TaskInstances, messages from that SSP should be delivered to all matching TaskInstances, not just one
          Hide
          jghoman Jakob Homan added a comment -

          Latest and hopefully final iteration on patch. Comments on rb.

          Show
          jghoman Jakob Homan added a comment - Latest and hopefully final iteration on patch. Comments on rb.
          Hide
          jghoman Jakob Homan added a comment -

          Backwards-compatibility or conversion of checkpoints (allow Samza 0.8 to read Samza 0.7 checkpoint)?

          I'd like to handle that as a separate tool that reads an old checkpoint and writes a new one in the new format. Shouldn't be too hard and I'll open a separate JIRA.

          CheckpointTool is broken and needs to be rewritten

          Done.

          Update docs on the website to describe the new world order and new configuration properties

          Done.

          If the same SSP is assigned to several TaskInstances, messages from that SSP should be delivered to all matching TaskInstances, not just one

          Added check that we don't have the same SSP assigned to multiple TaskNames and throws a start-up error if so. The ability add multiple SSPS to the same TaskName is useful, but we should think through all the implications (and then support that type of delivery in the code). Another JIRA...

          Show
          jghoman Jakob Homan added a comment - Backwards-compatibility or conversion of checkpoints (allow Samza 0.8 to read Samza 0.7 checkpoint)? I'd like to handle that as a separate tool that reads an old checkpoint and writes a new one in the new format. Shouldn't be too hard and I'll open a separate JIRA. CheckpointTool is broken and needs to be rewritten Done. Update docs on the website to describe the new world order and new configuration properties Done. If the same SSP is assigned to several TaskInstances, messages from that SSP should be delivered to all matching TaskInstances, not just one Added check that we don't have the same SSP assigned to multiple TaskNames and throws a start-up error if so. The ability add multiple SSPS to the same TaskName is useful, but we should think through all the implications (and then support that type of delivery in the code). Another JIRA...
          Hide
          jghoman Jakob Homan added a comment -

          Hit publish too quick on rb, so changes from last:

          • More unit tests.
          • Added grouper class name into checkpoint so we don't load checkpoints created by other groupers
          • Fixed bug in dividing TaskNames into the requested number of containers
          • Rewrote checkpoint tool
          • Refactored out Kafka checkpoint key to its own class
          • Refactored KafkaCheckpointManager to avoid code duplication
          • Fixed TestStatefulTask test to actually check this code.
          Show
          jghoman Jakob Homan added a comment - Hit publish too quick on rb, so changes from last: More unit tests. Added grouper class name into checkpoint so we don't load checkpoints created by other groupers Fixed bug in dividing TaskNames into the requested number of containers Rewrote checkpoint tool Refactored out Kafka checkpoint key to its own class Refactored KafkaCheckpointManager to avoid code duplication Fixed TestStatefulTask test to actually check this code.
          Hide
          jghoman Jakob Homan added a comment -

          Got a +1 from Chris on review board. Committed. Thanks, everyone!

          Show
          jghoman Jakob Homan added a comment - Got a +1 from Chris on review board. Committed. Thanks, everyone!

            People

            • Assignee:
              jghoman Jakob Homan
              Reporter:
              jghoman Jakob Homan
            • Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development