Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35715

Mysql Source support schema cache to deserialize record

    XMLWordPrintableJSON

Details

    Description

       

      Current, DebeziumEventDeserializationSchema will deserialize each record with schema inferred by this record.

       

      private RecordData extractDataRecord(Struct value, Schema valueSchema) throws Exception {
          DataType dataType = schemaDataTypeInference.infer(value, valueSchema);
          return (RecordData) getOrCreateConverter(dataType).convert(value, valueSchema);
      }
       

      There are some issues:

      1. Inferring and creating a converter as soon as a record arrives will incur additional costs.
      2. Inferring from a record might not reflect the real table schema accurately. For instance, a timestamp type with precision 6 in MySQL might have a value with 0 nanoseconds of the millisecond. When inferred, it will appear to have a precision of 0.
      protected DataType inferString(Object value, Schema schema) {
          if (ZonedTimestamp.SCHEMA_NAME.equals(schema.name())) {
              int nano =
                      Optional.ofNullable((String) value)
                              .map(s -> ZonedTimestamp.FORMATTER.parse(s, Instant::from))
                              .map(Instant::getNano)
                              .orElse(0);
      
              int precision;
              if (nano == 0) {
                  precision = 0;
              } else if (nano % 1000 > 0) {
                  precision = 9;
              } else if (nano % 1000_000 > 0) {
                  precision = 6;
              } else if (nano % 1000_000_000 > 0) {
                  precision = 3;
              } else {
                  precision = 0;
              }
              return DataTypes.TIMESTAMP_LTZ(precision);
          }
          return DataTypes.STRING();
      } 

      However, timestamps with different precisions will have different data formats in BinaryRecordData. Placing data with a timestamp of 0 precision and then parsing it with a precision of 6 will result in an exception being thrown.

       

      //org.apache.flink.cdc.common.data.binary.BinaryRecordData#getTimestamp
      @Override
      public TimestampData getTimestamp(int pos, int precision) {
          assertIndexIsValid(pos);
      
          if (TimestampData.isCompact(precision)) {
              return TimestampData.fromMillis(segments[0].getLong(getFieldOffset(pos)));
          }
      
          int fieldOffset = getFieldOffset(pos);
          final long offsetAndNanoOfMilli = segments[0].getLong(fieldOffset);
          return BinarySegmentUtils.readTimestampData(segments, offset, offsetAndNanoOfMilli);
      } 

      Thus, I think we should cache the table schema in Source, and only update it with SchemaChangeRecord. Thus, the schema of source SourceRecordEventDeserializer is always same with database.

      Attachments

        Issue Links

          Activity

            People

              ruanhang1993 Ruan Hang
              loserwang1024 Hongshun Wang
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: