The attempt to read the per key-group header bytes here does not guarantee the header bytes are fully-read:
What could happen is the following:
- Say the input stream actually has the header bytes written in there
- Less then HEADER_BYTES.length number of bytes was read into the read buffer, in the above reference code line.
- The if (bytesRead > 0 && !Arrays.equals(header, HEADER_BYTES)) check would be true, because the read byte array != the expected header bytes.
- We would mistakenly think that the header bytes are not in the input stream, and pushback. i.e. the header bytes were not being skipped, and the following reads would see the header bytes first.
- Most importantly, since the header bytes are not being skipped in this case, the STATEFUN_VERSION (which is 0) is being incorrectly read by KeyGroupStream.readFrom(...) as the number of feedback elements to read: https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/KeyGroupStream.java#L57
- The end result of all of this is in this scenario: some checkpointed feedback events would be silently dropped.
Although it is hard to say how possible this would happen in reality, and would also depend on the actual implementation of the InputStream, from the general contracts of InputStream#read(byte) this is definitely something that should be addressed.