Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-6052

Standardise TIMESTAMP(6) format when writing to Parquet files

    XMLWordPrintableJSON

Details

    Description

      APPEND-ONLY MODE

       

      When TIMESTAMP(6) is used for APPEND-ONLY pipelines with inline-clustering enabled, the error below will be thrown:

       

       

      Caused by: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file 
          at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
          at java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811)
          at java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:295)
          at java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:207)
          at java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:162)
          at java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:301)
          at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
          at org.apache.hudi.client.utils.ConcatenatingIterator.hasNext(ConcatenatingIterator.java:45)
          at org.apache.hudi.sink.clustering.ClusteringOperator.doClustering(ClusteringOperator.java:307)
          at org.apache.hudi.sink.clustering.ClusteringOperator.processElement(ClusteringOperator.java:240)
          at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
          at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
          at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
          at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:524)
          at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:758)
          at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:951)
          at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:930)
          at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:744)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
          at java.lang.Thread.run(Thread.java:750)
      Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file file:/var/folders/p_/09zfm5sx3v14w97hhk4vqrn8s817xt/T/junit5996224223926304717/par2/3cc78c96-2823-46fb-ab8c-7106edd55fc7-0_1-4-0_20230410162304415.parquet
          at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254)
          at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
          at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
          at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
          ... 22 more
      Caused by: java.lang.UnsupportedOperationException: org.apache.parquet.avro.AvroConverters$FieldLongConverter
          at org.apache.parquet.io.api.PrimitiveConverter.addBinary(PrimitiveConverter.java:70)
          at org.apache.parquet.column.impl.ColumnReaderBase$2$6.writeValue(ColumnReaderBase.java:390)
          at org.apache.parquet.column.impl.ColumnReaderBase.writeCurrentValueToConverter(ColumnReaderBase.java:440)
          at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:30)
          at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
          at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:229)
          ... 25 more
      Process finished with exit code 255 

       

       

      Sample code to trigger this:

       

      CREATE TABLE `src_table` (
        `id` INT,
        `userId` INT,
        `name` STRING,
        `timestamp_col` TIMESTAMP(6)
      )
      WITH (
          'connector' = 'datagen',
          'rows-per-second' = '50'
      );
      
      -- will write TIMESTAMP(6) type as INT96
      CREATE TABLE `sink_table` 
      (
        `id` INT,
        `userId` INT,
        `name` STRING,
        `timestamp_col` TIMESTAMP(6)
      )
      WITH (
        'connector' = 'hudi',
        'path' = 'hdfs://path/to/table/',
        'table.type' = 'COPY_ON_WRITE',
        'write.operation' = 'insert',  
        'hoodie.datasource.write.recordkey.field' = 'id',
        'hive_sync.enable' = 'false',
        'hoodie.datasource.write.hive_style_partitioning' = 'true',
        'clustering.async.enabled' = 'true', -- enable inline clustering
        'clustering.schedule.enabled'= 'true', -- enable clustering schedule
        'clustering.delta_commits'='4', -- schedule clustering every 4 commits
        'hoodie.clustering.plan.strategy.small.file.limit'='104857600' -- only rewrite file smaller than 100MB
      );
      
      insert into sink_table
      select 
        *
      from src_table;

       

      After looking through the code, we realised that the same TIMESTAMP(6) type will be written as INT96 to parquet when AppendWriteFunction is used.

       

      Snippet extracted from parquet-tools to show the physical type in parquet:

       

      ############ Column(timestamp_col)[row group 0] ############
      name: timestamp_col
      path: timestamp_col
      max_definition_level: 1
      max_repetition_level: 0
      physical_type: INT96
      logical_type: None
      converted_type (legacy): NONE
      compression: GZIP (space_saved: 55%)
      total_compressed_size: 1102
      total_uncompressed_size: 2444 

       


       

      UPSERT MODE

       

      However, if StreamWriteFunction is used, TIMESTAMP(6) types will be written as INT64 to parquet.

       

      One can reproduce this by using the code below (changing the write.operation value to update)

       

      CREATE TABLE `src_table` (
        `id` INT,
        `userId` INT,
        `name` STRING,
        `timestamp_col` TIMESTAMP(6)
      )
      WITH (
          'connector' = 'datagen',
          'rows-per-second' = '50'
      );
      
      -- will write TIMESTAMP(6) type as INT64
      CREATE TABLE `sink_table` 
      (
        `id` INT,
        `userId` INT,
        `name` STRING,
        `timestamp_col` TIMESTAMP(6)
      )
      WITH (
        'connector' = 'hudi',
        'path' = 'hdfs://path/to/table/',
        'table.type' = 'COPY_ON_WRITE',
        'write.operation' = 'update',  
        'hoodie.datasource.write.recordkey.field' = 'id',
        'hive_sync.enable' = 'false',
        'hoodie.datasource.write.hive_style_partitioning' = 'true',
        'clustering.async.enabled' = 'true', -- enable inline clustering
        'clustering.schedule.enabled'= 'true', -- enable clustering schedule
        'clustering.delta_commits'='4', -- schedule clustering every 4 commits
        'hoodie.clustering.plan.strategy.small.file.limit'='104857600' -- only rewrite file smaller than 100MB
      );
      
      insert into sink_table
      select 
        *
      from src_table; 

       

       

      Snippet extracted from parquet-tools to show the physical type in parquet:

      ############ Column(timestamp_col)[row group 0] ############
      name: timestamp_col
      path: timestamp_col
      max_definition_level: 1
      max_repetition_level: 0
      physical_type: INT64
      logical_type: Timestamp(isAdjustedToUTC=true, timeUnit=microseconds, is_from_converted_type=false, force_set_converted_type=false)
      converted_type (legacy): TIMESTAMP_MICROS
      compression: GZIP (space_saved: 26%)
      total_compressed_size: 1228
      total_uncompressed_size: 1654 

       

      Attachments

        Issue Links

          Activity

            People

              voonhous voon
              voonhous voon
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: