Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-1065

Change the commit order to support at least once processing when deduping with local state store

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.12.0
    • Component/s: None
    • Labels:
      None

      Description

      The current commit order for a task instance is (https://github.com/apache/samza/blob/1d458050c06deefc1dcf3d3e9534631aece64553/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala#L193):
      1. storeManager.flush() (flushes store contents to disk and writes the changelog OFFSET file)
      2. collector.flush() (flushes producer buffers for both task output and changelog streams)
      3. offsetManager.checkpoint(taskName)

      Consider a scenario where the store is being used for de-duping incoming messages by recording previously seen message UIDs. If the container dies during commit between steps 1 and 2 and restarts on the same host (due to host affinity), it'll consider the persisted store contents on the disk as the source of truth. This will cause some of the incoming messages to not be (re-)processed, even though their output wasn't flushed earlier. Since their changelog entries weren't flushed either, this behavior will be different depending on whether the container restarted on the same host or on another host.

      There are two issues here:

      • Output messages need to be flushed before flushing the store changelog messages.
      • Store changelog messages need to be flushed before the store contents are persisted to disk.

      Note: the LoggedStore can be flushed independently from a TaskInstance#commit() (e.g. in CachedStore). Furthermore, the underlying raw store (e.g. RocksDB) can flush its in-memory contents to disk independently from LoggedStore#flush().

      One solution would be to:
      a. Change the LoggedStore#put to call collector.send() before calling underlying store.put().
      b. Change the commit order to the following:
      1. output collector.flush()
      2. changelog collector.flush() (requires separating output and changelog system producers)
      3.1. get changelog offset
      3.2. taskStores.flush()
      3.3. write changelog OFFSET file
      4. offsetManager.checkpoint()
      c. Document that if users need at-least once guarantee AND deduping, they should turn on synchronous send for the output and changelog system producers and call commit after processing each message.

        Issue Links

          Activity

          Hide
          pmaheshwari Prateek Maheshwari added a comment -

          Re: 3.1, 3.2 and 3.3:
          I think the current order (taskStores.flush, getChangelog offset, write OFFSET file) should be fine since commit() is exclusive with process() and window() and there should be no new messages written to the store during commit.

          Show
          pmaheshwari Prateek Maheshwari added a comment - Re: 3.1, 3.2 and 3.3: I think the current order (taskStores.flush, getChangelog offset, write OFFSET file) should be fine since commit() is exclusive with process() and window() and there should be no new messages written to the store during commit.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user prateekm opened a pull request:

          https://github.com/apache/samza/pull/35

          SAMZA-1065: Change the commit order to support at least once processing when using local state store for deduping.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/prateekm/samza commit-order

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/samza/pull/35.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #35


          commit 63a38d3fba035cc465abc3dc4258d8a565807fb3
          Author: Prateek Maheshwari <pmaheshw@linkedin.com>
          Date: 2016-12-22T22:55:25Z

          SAMZA-1065: Change the commit order to support at least once processing when deduping with local state store.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user prateekm opened a pull request: https://github.com/apache/samza/pull/35 SAMZA-1065 : Change the commit order to support at least once processing when using local state store for deduping. You can merge this pull request into a Git repository by running: $ git pull https://github.com/prateekm/samza commit-order Alternatively you can review and apply these changes as the patch at: https://github.com/apache/samza/pull/35.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #35 commit 63a38d3fba035cc465abc3dc4258d8a565807fb3 Author: Prateek Maheshwari <pmaheshw@linkedin.com> Date: 2016-12-22T22:55:25Z SAMZA-1065 : Change the commit order to support at least once processing when deduping with local state store.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/samza/pull/35

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/samza/pull/35
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Merged and submitted. Thanks!

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Merged and submitted. Thanks!

            People

            • Assignee:
              pmaheshwari Prateek Maheshwari
              Reporter:
              pmaheshwari Prateek Maheshwari
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development