Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-26643

EOFException throws when use TypeInformationKeyValueSerializationSchema to deserialize data from kafka

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.13.2, 1.14.3
    • None
    • API / Core
    • None

    Description

      Recently , I use TypeInformationKeyValueSerializationSchema  to deserialize data from kafka, it throws java.io.EOFException .

       

      Useage in code: 

      setDeserializer(KafkaRecordDeserializationSchema.of(new TypeInformationKeyValueSerializationSchema (String.class, String.class, new ExecutionConfig())));

       

      Data in Kafka is like:

      key(string):2 22 6F F0 A8 8B 69 E 61 22 A5 E7 B5 50

      value(json string):

      { "timestamp":"2022-3-11 17:9", "deviceNumber":"22 6F F0 A8 8B 69 E 61 22 A5 E7 B5 50 ", "eventId":"830500" }

       

      After  trouble shooting, I think follwing API is the root cause:

      org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte()

      public int readUnsignedByte () throws IOException {
          if (this.position < this.end) {
              return (this.buffer[this.position++] & 0xff);
          } else {
              throw new EOFException();
          }
      }

       

      Obviously, it is wrong to throw an exception when (this.positon == this.end). 

      It just means finishing to read unsigned byte when  (this.positon == this.end), nothing need to do.

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            Aeternus Hengtai Nie
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: