Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33129

Can't create RowDataToAvroConverter for LocalZonedTimestampType logical type

    XMLWordPrintableJSON

Details

    Description

      While creating converter using `RowDataToAvroConverters.createConverter` with LocalZonedTimestampType logical type, the method will throw exception. This is because the switch clause is missing a clause for `LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZON`.

      Code: https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java#L75

       

      We can convert the value to `LocalDateTime` and then `TimestampData` using method below. Then we can apply the same converter as 
      TIMESTAMP_WITHOUT_TIME_ZONE? 
       
      `TimestampData fromLocalDateTime(LocalDateTime dateTime)`

      Can Flink team help adding the support for this logical type and logical type root?

      This is now a blocker for creating Flink Iceberg consumer with Avro GenericRecord when IcebergTable has `TimestampTZ` type field which will be converted to LocalZonedTimestampType.

      See error below:
      Unsupported type: TIMESTAMP_LTZ(6)
      stack: [ [-]
      org.apache.flink.formats.avro.RowDataToAvroConverters.createConverter(RowDataToAvroConverters.java:186)
      java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
      java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
      java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
      java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
      java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:550)
      java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
      java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:517)
      org.apache.flink.formats.avro.RowDataToAvroConverters.createRowConverter(RowDataToAvroConverters.java:224)
      org.apache.flink.formats.avro.RowDataToAvroConverters.createConverter(RowDataToAvroConverters.java:178)
      org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter.<init>(RowDataToAvroGenericRecordConverter.java:46)
      org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter.fromIcebergSchema(RowDataToAvroGenericRecordConverter.java:60)
      org.apache.iceberg.flink.source.reader.AvroGenericRecordReaderFunction.lazyConverter(AvroGenericRecordReaderFunction.java:93)
      org.apache.iceberg.flink.source.reader.AvroGenericRecordReaderFunction.createDataIterator(AvroGenericRecordReaderFunction.java:85)
      org.apache.iceberg.flink.source.reader.DataIteratorReaderFunction.apply(DataIteratorReaderFunction.java:39)
      org.apache.iceberg.flink.source.reader.DataIteratorReaderFunction.apply(DataIteratorReaderFunction.java:27)
      org.apache.iceberg.flink.source.reader.IcebergSourceSplitReader.fetch(IcebergSourceSplitReader.java:74)
      org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
      org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
      org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
      java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      java.util.concurrent.FutureTask.run(FutureTask.java:264)
      java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      java.lang.Thread.run(Thread.java:829)

      Attachments

        Activity

          People

            Unassigned Unassigned
            neuscottshao Zhaoyang Shao
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:

              Time Tracking

                Estimated:
                Original Estimate - 1h
                1h
                Remaining:
                Remaining Estimate - 1h
                1h
                Logged:
                Time Spent - Not Specified
                Not Specified