I noticed that the SamzaContainer checkpoints immediately on startup. This is kind of useless since it has either processed only 0 or 1 messages. The issue is that last window and last commit are both set to 0:
The logic that's triggered after the RunLoop starts is:
Of course this is always true, since lastCommitMs + commitMs will always be < clock() when lastCommitMs starts at 0. The same logic is triggered for window.
It seems like we should set lastCommitMs/lastWindowMs to clock() if they're 0 (or just set them to clock() on instantiation).
I noticed this by running a kafka-console-consumer against hello-samza. When the job started, I saw:
The first empty checkpoint happened immediately on startup, before any messages had been processed. Since the job was being started for the first time, OffsetManager.lastProcessedOffsets was empty, and it immediately checkpointed an empty map. This shouldn't result in data loss, it's just annoying and useless.