Affects Version/s: None
Fix Version/s: 0.14.0
Operators in the the High Level API need globally unique IDs so that we can:
1. Create intermediate topics (e.g. for partitionBy).
2. Create operator state stores and their changelog topics.
3. Start processing from particular points in the DAG (multi-stage).
Ideally, a particular operator's ID should be:
1. The same across job restarts with no code changes.
2. The same despite functionality preserving changes in topology (e.g. splitting a map into two maps).
3. Different if the semantics of an operator change without a change in topology (e.g. a change in the map function logic).
1 is satisfied by a topology based ID, but 2 & 3 are difficult for the framework to determine automatically. All three are required to ensure correctness of durable intermediate state in the DAG across restarts with code changes. In other words, users need to be able handle the following scenarios:
1. If they make a code change that preserves the topology but changes the semantics, they should be able to indicate that the downstream stateful operators are now semantically different than before and the previous state should be invalidated.
2. If they make a code change that changes the topology but preserves the semantics, they should be able to indicate that the downstream stateful operators are still the same as earlier and the previous state should be reused.
To support these two scenarios, we will allow (and recommend) users to provide a custom ID for stateful operators (partitionBy, window and join operators) that's unique in the DAG. We will use this ID as part of the name for any durable state (streams, stores, changelogs) associated with the operator. Whenever users make a code change that changes the topology or semantics of the application, they should identify any affected operators downstream of the change and change or preserve their operator names accordingly.