The current commit order for a task instance is (https://github.com/apache/samza/blob/1d458050c06deefc1dcf3d3e9534631aece64553/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala#L193):
1. storeManager.flush() (flushes store contents to disk and writes the changelog OFFSET file)
2. collector.flush() (flushes producer buffers for both task output and changelog streams)
Consider a scenario where the store is being used for de-duping incoming messages by recording previously seen message UIDs. If the container dies during commit between steps 1 and 2 and restarts on the same host (due to host affinity), it'll consider the persisted store contents on the disk as the source of truth. This will cause some of the incoming messages to not be (re-)processed, even though their output wasn't flushed earlier. Since their changelog entries weren't flushed either, this behavior will be different depending on whether the container restarted on the same host or on another host.
There are two issues here:
- Output messages need to be flushed before flushing the store changelog messages.
- Store changelog messages need to be flushed before the store contents are persisted to disk.
Note: the LoggedStore can be flushed independently from a TaskInstance#commit() (e.g. in CachedStore). Furthermore, the underlying raw store (e.g. RocksDB) can flush its in-memory contents to disk independently from LoggedStore#flush().
One solution would be to:
a. Change the LoggedStore#put to call collector.send() before calling underlying store.put().
b. Change the commit order to the following:
1. output collector.flush()
2. changelog collector.flush() (requires separating output and changelog system producers)
3.1. get changelog offset
3.3. write changelog OFFSET file
c. Document that if users need at-least once guarantee AND deduping, they should turn on synchronous send for the output and changelog system producers and call commit after processing each message.