Currently we close the stream thread in the following way:
0. Commit all tasks.
1. Close producer.
2. Close consumer.
3. Close restore consumer.
4. For each task, close its topology processors one-by-one following the topology order by calling processor.close().
5. For each task, close its state manager as well as flushing and closing all its associated state stores.
We choose to close the producer / consumer clients before shutting down the tasks because we need to make sure all sent records has been acked so that we have the right log-end-offset when closing the store and checkpointing the offset of the changelog. However there is also an issue with this ordering, in which users choose to write more records in their processor.close() calls, this will cause RTE since the producers has already been closed, and no changelog records will be able to write.
Thinking about this issue, a more appropriate ordering will be:
1. For each task, close their topology processors following the topology order by calling processor.close().
2. For each task, commit its state by calling task.commit(). At this time all sent records should be acked since producer.flush() is called.
3. For each task, close their ProcessorStateManager.
4. Close all embedded clients, i.e. producer / consumer / restore consumer.