Description
If a container dies, Samza currently suspends processing of the input stream partitions assigned to that container until a new container has been brought up (which then resumes processing from the last checkpoint). That downtime can be substantial if the job has a lot of local state which needs to be restored.
If the application can tolerate such processing latency, that's not a problem. However, some jobs may have an SLA that requires them to always process messages with low latency, even if a container fails. For such jobs, it would be good to have the option of enabling "hot standby" containers, which can take over from a failed container as soon as a failure is detected.
The proposed implementation is for each active container to have a standby container (thus doubling the number of containers required). The standby container consumes the checkpoint stream and any changelog streams produced by its active counterpart. The standby looks quite like a container that is being restored after a failure, except that it is constantly in restoration mode, and doesn't consume any messages directly from the input streams. This is similar to leader-based replication (master-slave replication) found in many databases: a follower/slave is constantly mirroring changes on the leader/master, but does not process any writes from clients.
When an active container fails, its standby can be promoted to active (like failover in database replication). When thus instructed, the standby stops consuming the checkpoint and changelog streams, starts consuming the input streams from the most recent checkpoint, and starts producing output streams and changelogs. In the background, a new standby container can be fired up.
There will need to be some care to avoid split-brain problems (two containers simultaneously believe that they are active, leading to input messages being consumed twice and output messages being duplicated). Perhaps a container must stop processing if it has not been able to successfully check in with a central controller node (e.g. YARN AM) for some amount of time, and the controller must wait at least that amount of time before promoting a standby to active. Therefore this feature will probably require some direct RPC between containers and YARN AM (or equivalent).
This feature probably doesn't require any new user-facing APIs (from application code's point of view, a standby container looks like a container that is being restored), and just one boolean configuration flag to enable hot standby.