Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-28367

OffsetDateTime does not work with keyBy

Details

    • Bug
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • 1.15.0
    • None
    • None
      • Java 1.8 (openjdk 1.8.0_322)
      • Scala 2.12.15
      • Flink 1.15.0

    Description

      Using keyBy incorrectly (de-)serializes java.time.OffsetDateTime types - the offset gets lost and becomes null.

      Here's a minimal non-working example:

       

      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
      
      import java.time.OffsetDateTime
      
      object MWE {
        def main(args: Array[String]): Unit = {
          val env = StreamExecutionEnvironment.getExecutionEnvironment
      
          env
            .fromElements("2022-07-03T15:35:48.142Z", "2022-07-03T15:35:48.438Z")
            .map(OffsetDateTime.parse(_))
            .keyBy((t: OffsetDateTime) => t)
            .print()
      
          env.execute()
        }
      } 

       

      Expected Output:

      2022-07-03T15:35:48.438Z
      2022-07-03T15:35:48.142Z

      Actual Output:

      2022-07-03T15:35:48.438null
      2022-07-03T15:35:48.142null

      The issue arises whenever keyBy and OffsetDateTime are involved; I believe it could have something to do with the way that flink serializes the state.

      Attachments

        Activity

          A workaround is to register a custom Kryo serializer that simply uses toString and parse:

          import java.time.OffsetDateTime
          
          import com.esotericsoftware.kryo.io.{Input, Output}
          import com.esotericsoftware.kryo.{Kryo, Serializer}
          
          class FixedOffsetDateTimeSerializer
              extends Serializer[OffsetDateTime]
              with Serializable {
            override def write(kryo: Kryo, output: Output, obj: OffsetDateTime): Unit =
              output.writeString(obj.toString)
          
            override def read(
                kryo: Kryo,
                input: Input,
                `type`: Class[OffsetDateTime]
            ): OffsetDateTime =
              OffsetDateTime.parse(input.readString())
          }
          
          object MWE {
            def main(args: Array[String]): Unit = {
              val env = StreamExecutionEnvironment.getExecutionEnvironment
              env.getConfig.registerTypeWithKryoSerializer[FixedOffsetDateTimeSerializer](
                classOf[OffsetDateTime],
                new FixedOffsetDateTimeSerializer()
              )
          
              env
                .fromElements("2022-07-03T15:35:48.142Z", "2022-07-03T15:35:48.438Z")
                .map(OffsetDateTime.parse(_))
                .keyBy((t: OffsetDateTime) => t)
                .print()
          
              env.execute()
            }
          }  

          But I feel like this should be working out of the box.

          _hl_ Henrik Laxhuber added a comment - A workaround is to register a custom Kryo serializer that simply uses toString and parse: import java.time.OffsetDateTime import com.esotericsoftware.kryo.io.{Input, Output} import com.esotericsoftware.kryo.{Kryo, Serializer} class FixedOffsetDateTimeSerializer extends Serializer[OffsetDateTime] with Serializable { override def write(kryo: Kryo, output: Output, obj: OffsetDateTime): Unit = output.writeString(obj.toString) override def read( kryo: Kryo, input: Input, `type`: Class [OffsetDateTime] ): OffsetDateTime = OffsetDateTime.parse(input.readString()) } object MWE { def main(args: Array[ String ]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.registerTypeWithKryoSerializer[FixedOffsetDateTimeSerializer]( classOf[OffsetDateTime], new FixedOffsetDateTimeSerializer() ) env .fromElements( "2022-07-03T15:35:48.142Z" , "2022-07-03T15:35:48.438Z" ) .map(OffsetDateTime.parse(_)) .keyBy((t: OffsetDateTime) => t) .print() env.execute() } }   But I feel like this should be working out of the box.
          qingwei91 Lim Qing Wei added a comment - - edited

          I had a look, I think part of it is that we are using a very old version of Kryo, this issue is fixed in the newer version.

           

          One way to fix it easily is to add new kryo version to your deps, as documented here: https://github.com/EsotericSoftware/kryo#with-maven

          If you're using sbt, just add the following to your deps should suffice, it fixes the issue for me (using your example, thanks for that)

          "com.esotericsoftware" % "kryo" % "5.3.0"

           

          I think it might be a good idea to bump Flink's Kryo, but I dont know what it entails, is it going to be a pain to upgrade because we depends on many obselete API? Might be good to get contributor's input here, and have it tracked in dedicated ticket

          qingwei91 Lim Qing Wei added a comment - - edited I had a look, I think part of it is that we are using a very old version of Kryo, this issue is fixed in the newer version.   One way to fix it easily is to add new kryo version to your deps, as documented here: https://github.com/EsotericSoftware/kryo#with-maven If you're using sbt, just add the following to your deps should suffice, it fixes the issue for me (using your example, thanks for that) "com.esotericsoftware" % "kryo" % "5.3.0"   I think it might be a good idea to bump Flink's Kryo, but I dont know what it entails, is it going to be a pain to upgrade because we depends on many obselete API? Might be good to get contributor's input here, and have it tracked in dedicated ticket
          martijnvisser Martijn Visser added a comment -

          We "can't" bump Flink's Kryo version, see FLINK-3154

          martijnvisser Martijn Visser added a comment - We "can't" bump Flink's Kryo version, see FLINK-3154

          Thank's Lim for identifying the root cause!

          Without having a deep understanding of flink, wouldn't it be possible to run multiple versions of Kryo in parallel, and default to the newer one (with proper versioning in savepoints) for new projects?

          _hl_ Henrik Laxhuber added a comment - Thank's Lim for identifying the root cause! Without having a deep understanding of flink, wouldn't it be possible to run multiple versions of Kryo in parallel, and default to the newer one (with proper versioning in savepoints) for new projects?
          martijnvisser Martijn Visser added a comment -

          hl Unfortunately not. It would be a welcome addition though

          martijnvisser Martijn Visser added a comment - hl Unfortunately not. It would be a welcome addition though

          People

            Unassigned Unassigned
            _hl_ Henrik Laxhuber
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated: