Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.13.2, 1.14.3
-
None
-
None
Description
Recently , I use TypeInformationKeyValueSerializationSchema to deserialize data from kafka, it throws java.io.EOFException .
Useage in code:
setDeserializer(KafkaRecordDeserializationSchema.of(new TypeInformationKeyValueSerializationSchema (String.class, String.class, new ExecutionConfig())));
Data in Kafka is like:
key(string):2 22 6F F0 A8 8B 69 E 61 22 A5 E7 B5 50
value(json string):
{ "timestamp":"2022-3-11 17:9", "deviceNumber":"22 6F F0 A8 8B 69 E 61 22 A5 E7 B5 50 ", "eventId":"830500" }
After trouble shooting, I think follwing API is the root cause:
org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte()
public int readUnsignedByte () throws IOException { if (this.position < this.end) { return (this.buffer[this.position++] & 0xff); } else { throw new EOFException(); } }
Obviously, it is wrong to throw an exception when (this.positon == this.end).
It just means finishing to read unsigned byte when (this.positon == this.end), nothing need to do.