Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
cdc-3.1.1
Description
Current, DebeziumEventDeserializationSchema will deserialize each record with schema inferred by this record.
private RecordData extractDataRecord(Struct value, Schema valueSchema) throws Exception { DataType dataType = schemaDataTypeInference.infer(value, valueSchema); return (RecordData) getOrCreateConverter(dataType).convert(value, valueSchema); }
There are some issues:
- Inferring and creating a converter as soon as a record arrives will incur additional costs.
- Inferring from a record might not reflect the real table schema accurately. For instance, a timestamp type with precision 6 in MySQL might have a value with 0 nanoseconds of the millisecond. When inferred, it will appear to have a precision of 0.
protected DataType inferString(Object value, Schema schema) { if (ZonedTimestamp.SCHEMA_NAME.equals(schema.name())) { int nano = Optional.ofNullable((String) value) .map(s -> ZonedTimestamp.FORMATTER.parse(s, Instant::from)) .map(Instant::getNano) .orElse(0); int precision; if (nano == 0) { precision = 0; } else if (nano % 1000 > 0) { precision = 9; } else if (nano % 1000_000 > 0) { precision = 6; } else if (nano % 1000_000_000 > 0) { precision = 3; } else { precision = 0; } return DataTypes.TIMESTAMP_LTZ(precision); } return DataTypes.STRING(); }
However, timestamps with different precisions will have different data formats in BinaryRecordData. Placing data with a timestamp of 0 precision and then parsing it with a precision of 6 will result in an exception being thrown.
//org.apache.flink.cdc.common.data.binary.BinaryRecordData#getTimestamp @Override public TimestampData getTimestamp(int pos, int precision) { assertIndexIsValid(pos); if (TimestampData.isCompact(precision)) { return TimestampData.fromMillis(segments[0].getLong(getFieldOffset(pos))); } int fieldOffset = getFieldOffset(pos); final long offsetAndNanoOfMilli = segments[0].getLong(fieldOffset); return BinarySegmentUtils.readTimestampData(segments, offset, offsetAndNanoOfMilli); }
Thus, I think we should cache the table schema in Source, and only update it with SchemaChangeRecord. Thus, the schema of source SourceRecordEventDeserializer is always same with database.
Attachments
Issue Links
- links to