Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
1.15.3
Description
Currently the HeapStateBackend check whether the current key group index is a valid one while the RocksDBStateBackend will not. When using HeapStateBackend, if the user uses a non-deterministic shuffle key, an exception is thrown as follows:
java.lang.IllegalArgumentException: Key group 84 is not in KeyGroupRange{startKeyGroup=32, endKeyGroup=63}. Unless you're directly using low level state access APIs, this is most likely caused by non-deterministic shuffle key (hashCode and equals implementation). at org.apache.flink.runtime.state.KeyGroupRangeOffsets.newIllegalKeyGroupException(KeyGroupRangeOffsets.java:37) at org.apache.flink.runtime.state.heap.StateTable.getMapForKeyGroup(StateTable.java:305) at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:261) at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:143) at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:72) at com.alibaba.ververica.flink.state.benchmark.wordcount.WordCount$MixedFlatMapper.flatMap(WordCount.java:169) at com.alibaba.ververica.flink.state.benchmark.wordcount.WordCount$MixedFlatMapper.flatMap(WordCount.java:160) at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:135) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:106) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:526) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:811) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:760) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:954) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:933) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568) at java.lang.Thread.run(Thread.java:750)
However, the RocksDBStateBackend will run without an exception. The wrong key group index will cause a state correctness problem, so it is better to do a check in InternalKeyContextImpl#setCurrentKeyGroupIndex, and throw an exception immediately.
Attachments
Issue Links
- is blocked by
-
FLINK-29437 The partition of data before and after the Kafka Shuffle are not aligned
- Resolved
- links to