Details
-
Bug
-
Status: Open
-
Not a Priority
-
Resolution: Unresolved
-
1.8.1, 1.9.0
-
None
Description
The Kafka producer's transaction IDs are only generated once when there was no previous state for that operator. In the case where we restore and increase parallelism (scale-out), some operators may not have previous state and create new IDs. Now, if we also reduce the poolSize, these new IDs may overlap with the old ones which should never happen! Similarly, a scale-in + increasing poolSize could lead the the same thing.
An easy "fix" for this would be to forbid changing the poolSize. We could potentially be a bit better by only forbidding changes that can lead to transaction ID overlaps which we can identify from the formulae that TransactionalIdsGenerator uses. This should probably be the first step which can also be back-ported to older Flink versions just in case.
On a side note, the current scheme also relies on the fact, that the operator's list state distributes previous states during scale-out in a fashion that only the operators with the highest subtask indices do not get a previous state. This is somewhat "guaranteed" by OperatorStateStore#getListState() but I'm not sure whether we should actually rely on that there.