Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
-
None
Description
APPEND-ONLY MODE
When TIMESTAMP(6) is used for APPEND-ONLY pipelines with inline-clustering enabled, the error below will be thrown:
Caused by: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53) at java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811) at java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:295) at java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:207) at java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:162) at java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:301) at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) at org.apache.hudi.client.utils.ConcatenatingIterator.hasNext(ConcatenatingIterator.java:45) at org.apache.hudi.sink.clustering.ClusteringOperator.doClustering(ClusteringOperator.java:307) at org.apache.hudi.sink.clustering.ClusteringOperator.processElement(ClusteringOperator.java:240) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:524) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:758) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:951) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:930) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:744) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file file:/var/folders/p_/09zfm5sx3v14w97hhk4vqrn8s817xt/T/junit5996224223926304717/par2/3cc78c96-2823-46fb-ab8c-7106edd55fc7-0_1-4-0_20230410162304415.parquet at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254) at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132) at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136) at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48) ... 22 more Caused by: java.lang.UnsupportedOperationException: org.apache.parquet.avro.AvroConverters$FieldLongConverter at org.apache.parquet.io.api.PrimitiveConverter.addBinary(PrimitiveConverter.java:70) at org.apache.parquet.column.impl.ColumnReaderBase$2$6.writeValue(ColumnReaderBase.java:390) at org.apache.parquet.column.impl.ColumnReaderBase.writeCurrentValueToConverter(ColumnReaderBase.java:440) at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:30) at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406) at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:229) ... 25 more Process finished with exit code 255
Sample code to trigger this:
CREATE TABLE `src_table` ( `id` INT, `userId` INT, `name` STRING, `timestamp_col` TIMESTAMP(6) ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50' ); -- will write TIMESTAMP(6) type as INT96 CREATE TABLE `sink_table` ( `id` INT, `userId` INT, `name` STRING, `timestamp_col` TIMESTAMP(6) ) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://path/to/table/', 'table.type' = 'COPY_ON_WRITE', 'write.operation' = 'insert', 'hoodie.datasource.write.recordkey.field' = 'id', 'hive_sync.enable' = 'false', 'hoodie.datasource.write.hive_style_partitioning' = 'true', 'clustering.async.enabled' = 'true', -- enable inline clustering 'clustering.schedule.enabled'= 'true', -- enable clustering schedule 'clustering.delta_commits'='4', -- schedule clustering every 4 commits 'hoodie.clustering.plan.strategy.small.file.limit'='104857600' -- only rewrite file smaller than 100MB ); insert into sink_table select * from src_table;
After looking through the code, we realised that the same TIMESTAMP(6) type will be written as INT96 to parquet when AppendWriteFunction is used.
Snippet extracted from parquet-tools to show the physical type in parquet:
############ Column(timestamp_col)[row group 0] ############ name: timestamp_col path: timestamp_col max_definition_level: 1 max_repetition_level: 0 physical_type: INT96 logical_type: None converted_type (legacy): NONE compression: GZIP (space_saved: 55%) total_compressed_size: 1102 total_uncompressed_size: 2444
UPSERT MODE
However, if StreamWriteFunction is used, TIMESTAMP(6) types will be written as INT64 to parquet.
One can reproduce this by using the code below (changing the write.operation value to update)
CREATE TABLE `src_table` ( `id` INT, `userId` INT, `name` STRING, `timestamp_col` TIMESTAMP(6) ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50' ); -- will write TIMESTAMP(6) type as INT64 CREATE TABLE `sink_table` ( `id` INT, `userId` INT, `name` STRING, `timestamp_col` TIMESTAMP(6) ) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://path/to/table/', 'table.type' = 'COPY_ON_WRITE', 'write.operation' = 'update', 'hoodie.datasource.write.recordkey.field' = 'id', 'hive_sync.enable' = 'false', 'hoodie.datasource.write.hive_style_partitioning' = 'true', 'clustering.async.enabled' = 'true', -- enable inline clustering 'clustering.schedule.enabled'= 'true', -- enable clustering schedule 'clustering.delta_commits'='4', -- schedule clustering every 4 commits 'hoodie.clustering.plan.strategy.small.file.limit'='104857600' -- only rewrite file smaller than 100MB ); insert into sink_table select * from src_table;
Snippet extracted from parquet-tools to show the physical type in parquet:
############ Column(timestamp_col)[row group 0] ############ name: timestamp_col path: timestamp_col max_definition_level: 1 max_repetition_level: 0 physical_type: INT64 logical_type: Timestamp(isAdjustedToUTC=true, timeUnit=microseconds, is_from_converted_type=false, force_set_converted_type=false) converted_type (legacy): TIMESTAMP_MICROS compression: GZIP (space_saved: 26%) total_compressed_size: 1228 total_uncompressed_size: 1654
Attachments
Issue Links
- links to