Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-180

Support one-time offset reset for a Samza job

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.6.0
    • Fix Version/s: 0.7.0
    • Component/s: container
    • Labels:
      None

      Description

      Samza currently has a systems.%s.streams.%s.samza.reset.offset configuration. When set to "true", this configuration tells each SamzaContainer to disregard the checkpointed offsets for a stream when starting up. The problem with this configuration is that the checkpoints are disregarded every time the SamzaContainer starts up, not just the first time. If a host that a SamzaContainer is running on fails, and YARN (or some other mechanism) restarts the SamzaContainer, the container will not pick up where it left off, but will instead disregard the checkpointed offsets, and start over again, as before.

      There are some use-cases where developers wish to have a one-time reset of the checkpointed offsets. That is, they want to reset the offsets exactly once, but then have failures not trigger another reset. This is typically useful in bootstrapping cases (related to SAMZA-179), where a developer wishes to reset its task back to offset 0, and process all messages up to the head of a stream, then shut down. Right now, the developer can set reset.offset=true, and auto.offset.reset=smallest (if reprocessing a Kafka topic), but if the container ever restarts, processing will begin again from offset 0. This is not ideal.

      1. SAMZA-180.1.patch
        13 kB
        Martin Kleppmann
      2. SAMZA-180.5.patch
        25 kB
        Martin Kleppmann

        Issue Links

          Activity

          Hide
          criccomini Chris Riccomini added a comment -

          As part of this, it's also useful for developers to be able to do a one time offset reset to a specific offest (e.g. set SSP offset to 1234).

          Show
          criccomini Chris Riccomini added a comment - As part of this, it's also useful for developers to be able to do a one time offset reset to a specific offest (e.g. set SSP offset to 1234).
          Hide
          criccomini Chris Riccomini added a comment -

          Linking to SAMZA-157 and SAMZA-142 for discussion on forced offsets.

          Show
          criccomini Chris Riccomini added a comment - Linking to SAMZA-157 and SAMZA-142 for discussion on forced offsets.
          Hide
          jghoman Jakob Homan added a comment -

          Wouldn't this require re-writing and passing in a new config to subsequent tasks after a failure? Or are you thinking of another method?

          Show
          jghoman Jakob Homan added a comment - Wouldn't this require re-writing and passing in a new config to subsequent tasks after a failure? Or are you thinking of another method?
          Hide
          criccomini Chris Riccomini added a comment - - edited

          I can think of several potential solutions:

          1. One potential solution is dynamic config, as you suggest.
          2. Another potential is a CLI-based tool that messes with checkpoints.
          3. A third solution is some kind of logic that makes a note of when a job starts (maybe an incremental number in the checkpoint) and only resets once per-job start, not once per-container start.
          4. Write some interface that's used at job start time, and let developers fiddle with things themselves.

          None of these are really all that well thought out. Brainstorming.

          Show
          criccomini Chris Riccomini added a comment - - edited I can think of several potential solutions: 1. One potential solution is dynamic config, as you suggest. 2. Another potential is a CLI-based tool that messes with checkpoints. 3. A third solution is some kind of logic that makes a note of when a job starts (maybe an incremental number in the checkpoint) and only resets once per-job start, not once per-container start. 4. Write some interface that's used at job start time, and let developers fiddle with things themselves. None of these are really all that well thought out. Brainstorming.
          Hide
          criccomini Chris Riccomini added a comment -

          I should also note, some of the above proposed solutions are "once-per-job-execution" while some are just "once" (i.e. they are a single one time even that's triggered by the dev, vs. once every time the job starts). None of them should be per-container-start, which is what we have now with reset.offset.

          Show
          criccomini Chris Riccomini added a comment - I should also note, some of the above proposed solutions are "once-per-job-execution" while some are just "once" (i.e. they are a single one time even that's triggered by the dev, vs. once every time the job starts). None of them should be per-container-start, which is what we have now with reset.offset.
          Hide
          martinkl Martin Kleppmann added a comment -

          My $0.02: a CLI tool to write something to the checkpoints topic sounds like the cleanest solution to me. If I understand it correctly, the easiest procedure would be: an admin shuts down the job (so that it stops publishing to the checkpoints topic), then a CLI tool is used to write a new offset to each partition of the checkpoints topic, then the job is restarted. This approach doesn't require any special logic in Samza itself, just a small CLI tool (which can probably be quite simple).

          Show
          martinkl Martin Kleppmann added a comment - My $0.02: a CLI tool to write something to the checkpoints topic sounds like the cleanest solution to me. If I understand it correctly, the easiest procedure would be: an admin shuts down the job (so that it stops publishing to the checkpoints topic), then a CLI tool is used to write a new offset to each partition of the checkpoints topic, then the job is restarted. This approach doesn't require any special logic in Samza itself, just a small CLI tool (which can probably be quite simple).
          Hide
          criccomini Chris Riccomini added a comment -

          Yeah, that's how I'm leaning as well. I think Jay Kreps is also thinking along those lines.

          Show
          criccomini Chris Riccomini added a comment - Yeah, that's how I'm leaning as well. I think Jay Kreps is also thinking along those lines.
          Hide
          martinkl Martin Kleppmann added a comment -

          I have implemented a simple command-line tool as discussed: https://reviews.apache.org/r/19481/

          I've added a gradle task to make it easy to run from the source repo. For a binary distribution we'd probably also want to include a shell script, but it wasn't obvious to me where that ought to go. Should it just live in samza-shell alongside run-job.sh and friends?

          Show
          martinkl Martin Kleppmann added a comment - I have implemented a simple command-line tool as discussed: https://reviews.apache.org/r/19481/ I've added a gradle task to make it easy to run from the source repo. For a binary distribution we'd probably also want to include a shell script, but it wasn't obvious to me where that ought to go. Should it just live in samza-shell alongside run-job.sh and friends?
          Hide
          jghoman Jakob Homan added a comment -

          Been thinking about this some more. All of the solutions above seem a bit hacky because the use case goes against what Samza is offering right now: static configuration after the job starts up (and any config rewriters have had their say). I had thought that maybe 'one-time' configs could be introduced that would be fed to the first instance of a container but not subsequent ones. But since there is no AM-Samza Container communication after the container starts there is no way for a container to signal to the AM that it's done what it needs to do to be ready for those 'next-time' configs.

          Fundamentally, this use case isn't a streaming one, but rather a batch one, and hence will always be awkward in Samza. As such, the tool described above may be the best bet as it's the least invasive into the framework and easiest to remove once there's a better approach to be had.

          Show
          jghoman Jakob Homan added a comment - Been thinking about this some more. All of the solutions above seem a bit hacky because the use case goes against what Samza is offering right now: static configuration after the job starts up (and any config rewriters have had their say). I had thought that maybe 'one-time' configs could be introduced that would be fed to the first instance of a container but not subsequent ones. But since there is no AM-Samza Container communication after the container starts there is no way for a container to signal to the AM that it's done what it needs to do to be ready for those 'next-time' configs. Fundamentally, this use case isn't a streaming one, but rather a batch one, and hence will always be awkward in Samza. As such, the tool described above may be the best bet as it's the least invasive into the framework and easiest to remove once there's a better approach to be had.
          Hide
          martinkl Martin Kleppmann added a comment -

          I hear what you're saying, and I think it depends a bit how people want to use this one-time reset facility.

          1. If people want to regularly restart a job, and have it consume from offset 0 every time it is started, then I agree a one-time config would be the right way of doing this. As you say, that's more of a batch use case.
          2. If people just want the ability to manually change the state of the infrastructure occasionally, then I think giving them a tool to manipulate checkpoints is ok. Like rebalancing a Kafka cluster, or upgrading YARN, that would be an operational task that is only performed occasionally. It's not a particularly good approach if you want to consume from offset 0 every time the job starts up.

          Which of the two was the purpose for this feature?

          Show
          martinkl Martin Kleppmann added a comment - I hear what you're saying, and I think it depends a bit how people want to use this one-time reset facility. If people want to regularly restart a job, and have it consume from offset 0 every time it is started, then I agree a one-time config would be the right way of doing this. As you say, that's more of a batch use case. If people just want the ability to manually change the state of the infrastructure occasionally, then I think giving them a tool to manipulate checkpoints is ok. Like rebalancing a Kafka cluster, or upgrading YARN, that would be an operational task that is only performed occasionally. It's not a particularly good approach if you want to consume from offset 0 every time the job starts up. Which of the two was the purpose for this feature?
          Hide
          criccomini Chris Riccomini added a comment -

          As Jakob Homan said, this is really more of a batch use case, and I agree that the least invasive/easiest to delete approach is best, so I support the CLI-based tool.

          Also, a second use case that I think the CLI is really good for is for the ops/SRE side of things, where something went bad, and we just want to force the job to go back some number of messages to re-process the "bad" messages again. This also falls into the (2) use case list.

          Re: shell script, yeah it should just live in samza-shell along side the other run-* scripts.

          Show
          criccomini Chris Riccomini added a comment - As Jakob Homan said, this is really more of a batch use case, and I agree that the least invasive/easiest to delete approach is best, so I support the CLI-based tool. Also, a second use case that I think the CLI is really good for is for the ops/SRE side of things, where something went bad, and we just want to force the job to go back some number of messages to re-process the "bad" messages again. This also falls into the (2) use case list. Re: shell script, yeah it should just live in samza-shell along side the other run-* scripts.
          Hide
          martinkl Martin Kleppmann added a comment -

          I've added the shell script wrapper: https://reviews.apache.org/r/19481/

          Show
          martinkl Martin Kleppmann added a comment - I've added the shell script wrapper: https://reviews.apache.org/r/19481/
          Hide
          martinkl Martin Kleppmann added a comment -

          Chris Riccomini: Thanks for your review. I've updated the RB based on your comments: https://reviews.apache.org/r/19481/

          Show
          martinkl Martin Kleppmann added a comment - Chris Riccomini : Thanks for your review. I've updated the RB based on your comments: https://reviews.apache.org/r/19481/
          Hide
          martinkl Martin Kleppmann added a comment -

          Chris Riccomini: Updated RB to revision 4, incorporating your comments.

          Show
          martinkl Martin Kleppmann added a comment - Chris Riccomini : Updated RB to revision 4, incorporating your comments.
          Hide
          martinkl Martin Kleppmann added a comment -

          Attaching the latest (r5) version of the patch on the RB, rebased onto master. (Only a minor conflict due to the renaming of stateTopic to checkpointTopic.) All tests pass.

          Show
          martinkl Martin Kleppmann added a comment - Attaching the latest (r5) version of the patch on the RB, rebased onto master. (Only a minor conflict due to the renaming of stateTopic to checkpointTopic.) All tests pass.
          Hide
          martinkl Martin Kleppmann added a comment -

          Got a "ship it" on the RB. Thanks for reviewing! I've committed this. Resolving.

          Show
          martinkl Martin Kleppmann added a comment - Got a "ship it" on the RB. Thanks for reviewing! I've committed this. Resolving.

            People

            • Assignee:
              martinkl Martin Kleppmann
              Reporter:
              criccomini Chris Riccomini
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development