Details
-
Bug
-
Status: Open
-
Minor
-
Resolution: Unresolved
-
1.15.0
-
None
-
None
-
- Java 1.8 (openjdk 1.8.0_322)
- Scala 2.12.15
- Flink 1.15.0
Description
Using keyBy incorrectly (de-)serializes java.time.OffsetDateTime types - the offset gets lost and becomes null.
Here's a minimal non-working example:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import java.time.OffsetDateTime object MWE { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env .fromElements("2022-07-03T15:35:48.142Z", "2022-07-03T15:35:48.438Z") .map(OffsetDateTime.parse(_)) .keyBy((t: OffsetDateTime) => t) .print() env.execute() } }
Expected Output:
2022-07-03T15:35:48.438Z 2022-07-03T15:35:48.142Z
Actual Output:
2022-07-03T15:35:48.438null 2022-07-03T15:35:48.142null
The issue arises whenever keyBy and OffsetDateTime are involved; I believe it could have something to do with the way that flink serializes the state.
A workaround is to register a custom Kryo serializer that simply uses toString and parse:
But I feel like this should be working out of the box.