Luckily this bug was caught by RegexSourceIntegrationTest#testRegexRecordsAreProcessedAfterReassignment – we saw it fail with an NPE during SourceNode#deserializeKey, implying that the key deserializer was null which in turns implies that the source node was never initialized.
This can happen when a task is updated with new regex matched topic partitions. In order to update the topology with the new input partitions, we actually just create an entirely new topology from scratch including building new source node objects. We then re/initialize this new topology once the task is resumed.
The problem is that the task's RecordQueues save a reference to the corresponding source node, and use this to pass polled records into the topology. But the RecordQueues aren't updated with the newly built source nodes and still point to the original nodes.
If the task had not completed restoration before being updated with new partitions, it would never have initialized the original topology or source nodes, resulting in an NPE when the RecordQueue passes a record to the old, uninitialized source node.
This is the only specific known bug, but I haven't checked the entire code base so it's possible there are other node references saved that might result in bugs. We should try and avoid rebuilding an entirely new topology if at all possible, and see if we can just update the input partitions only where necessary