Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-3184

hudi-flink support timestamp-micros

    XMLWordPrintableJSON

Details

    Description

      Problem overview

      Steps to reproduce the behavior:

      ①The spark engine is used to write data into the hoodie table(PS: There are timestamp type columns in the dataset field).

      ②Use the Flink engine to read the hoodie table written in step 1.

      Expected behavior

      Caused by: java.lang.IllegalArgumentException: Avro does not support TIMESTAMP type with precision: 6, it only supports precision less than 3.
      at org.apache.hudi.util.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:221) ~...
      at org.apache.hudi.util.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:263) ~...
      at org.apache.hudi.util.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:169) ~...
      at org.apache.hudi.table.HoodieTableFactory.inferAvroSchema(HoodieTableFactory.java:239) ~...
      at org.apache.hudi.table.HoodieTableFactory.setupConfOptions(HoodieTableFactory.java:155) ~...
      at org.apache.hudi.table.HoodieTableFactory.createDynamicTableSource(HoodieTableFactory.java:65) ~...

      Environment Description

        Hudi version : 0.11.0-SNAPSHOT

        Spark version : 3.1.2

        Flink version : 1.13.1

        Hive version : None

        Hadoop version : 2.9.2

        Storage (HDFS/S3/GCS..) : HDFS

        Running on Docker? (yes/no) : None

      Additional context

      We are using hoodie as a data lake to deliver projects to customers. We found such application scenarios: write data to the hoodie table through the spark engine, and then read data from the hoodie table through the finlk engine.

      It should be noted that the above exception will be caused by how to write to the column containing the timestamp in the dataset.

      In order to simplify the description of the problem, we summarize the problem into the following steps:

      【step-1】Mock data:

      /home/deploy/spark-3.1.2-bin-hadoop2.7/bin/spark-shell \
      --driver-class-path /home/workflow/apache-hive-2.3.8-bin/conf/ \
      --master spark://2-120:7077 \
      --executor-memory 4g \
      --driver-memory 4g \
      --num-executors 4 \
      --total-executor-cores 4 \
      --name test \
      --jars /home/deploy/spark-3.1.2-bin-hadoop2.7/jars/hudi-spark3-bundle_2.12-0.11.0-SNAPSHOT.jar,/home/deploy/spark-3.1.2-bin-hadoop2.7/jars/spark-avro_2.12-3.1.2.jar \
      --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
      --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED \
      --conf spark.sql.hive.convertMetastoreParquet=false 
      val df = spark.sql("select 1 as id, 'A' as name, current_timestamp as dt")
      
      df.write.format("hudi").
        option("hoodie.datasource.write.recordkey.field", "id").
        option("hoodie.datasource.write.precombine.field", "id").
        option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
        option("hoodie.upsert.shuffle.parallelism", "2").
        option("hoodie.table.name", "timestamp_table").
        mode("append").
        save("/hudi/suite/data_type_timestamp_table")
      
      spark.read.format("hudi").load("/hudi/suite/data_type_timestamp_table").show(false) 

      【step-2】Consumption data through flink:

      bin/sql-client.sh embedded -j lib/hudi-flink-bundle_2.12-0.11.0-SNAPSHOT.jar 
      create table data_type_timestamp_table (
        `id` INT,
        `name` STRING,
        `dt` TIMESTAMP(6)
      ) with (
        'connector' = 'hudi',
        'hoodie.table.name' = 'data_type_timestamp_table',
        'read.streaming.enabled' = 'true',
        'hoodie.datasource.write.recordkey.field' = 'id',
        'path' = '/hudi/suite/data_type_timestamp_table',
        'read.streaming.check-interval' = '10',
        'table.type' = 'COPY_ON_WRITE',
        'write.precombine.field' = 'id'
      );
      
      select * from data_type_timestamp_table; 

      As shown below:

      If we changge timestamp (6) to timestamp (3),the result is as follows:

      The data can be found here, but the display is incorrect!

      After checking It is found in the Hoodie directory that the spark write timestamp type is timestamp micros:

      However, the timestamp type of hook reading and writing Hoodie data is timestamp-millis!Therefore, it is problematic for us to read and write timestamp types through Spark and Flink computing engines. We hope that hudi-flink module needs to support timestamp micros and cannot lose time accuracy.

      Attachments

        1. 1.png
          29 kB
          Well Tang
        2. 2.png
          4 kB
          Well Tang
        3. 3.png
          54 kB
          Well Tang

        Issue Links

          Activity

            People

              tangchenhao Well Tang
              tangchenhao Well Tang
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - 120h
                  120h
                  Remaining:
                  Remaining Estimate - 120h
                  120h
                  Logged:
                  Time Spent - Not Specified
                  Not Specified