Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
0.8.0
-
spark2.4.5 hive 3.1.1 hadoop 3.1.1
Description
create hudi table (mor or cow)
val base_data = spark.read.parquet("/tmp/tb_base")
val upsert_data = spark.read.parquet("/tmp/tb_upsert")
base_data.write.format("hudi").option(TABLE_TYPE_OPT_KEY, MOR_TABLE_TYPE_OPT_VAL).option(PRECOMBINE_FIELD_OPT_KEY, "col2").option(RECORDKEY_FIELD_OPT_KEY, "primary_key").option(PARTITIONPATH_FIELD_OPT_KEY, "col0").option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.SimpleKeyGenerator").option(OPERATION_OPT_KEY, "bulk_insert").option(HIVE_SYNC_ENABLED_OPT_KEY, "true").option(HIVE_PARTITION_FIELDS_OPT_KEY, "col0").option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor").option(HIVE_DATABASE_OPT_KEY, "testdb").option(HIVE_TABLE_OPT_KEY, "tb_test_mor_par").option(HIVE_USE_JDBC_OPT_KEY, "false").option("hoodie.bulkinsert.shuffle.parallelism", 4).option("hoodie.insert.shuffle.parallelism", 4).option("hoodie.upsert.shuffle.parallelism", 4).option("hoodie.delete.shuffle.parallelism", 4).option("hoodie.datasource.write.hive_style_partitioning", "true").option(TABLE_NAME, "tb_test_mor_par").mode(Overwrite).save(s"/tmp/testdb/tb_test_mor_par")
upsert_data.write.format("hudi").option(TABLE_TYPE_OPT_KEY, MOR_TABLE_TYPE_OPT_VAL).option(PRECOMBINE_FIELD_OPT_KEY, "col2").option(RECORDKEY_FIELD_OPT_KEY, "primary_key").option(PARTITIONPATH_FIELD_OPT_KEY, "col0").option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.SimpleKeyGenerator").option(OPERATION_OPT_KEY, "upsert").option(HIVE_SYNC_ENABLED_OPT_KEY, "true").option(HIVE_PARTITION_FIELDS_OPT_KEY, "col0").option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor").option(HIVE_DATABASE_OPT_KEY, "testdb").option(HIVE_TABLE_OPT_KEY, "tb_test_mor_par").option(HIVE_USE_JDBC_OPT_KEY, "false").option("hoodie.bulkinsert.shuffle.parallelism", 4).option("hoodie.insert.shuffle.parallelism", 4).option("hoodie.upsert.shuffle.parallelism", 4).option("hoodie.delete.shuffle.parallelism", 4).option("hoodie.datasource.write.hive_style_partitioning", "true").option(TABLE_NAME, "tb_test_mor_par").mode(Append).save(s"/tmp/testdb/tb_test_mor_par")
query incr view by sparksql:
set hoodie.tb_test_mor_par.consume.mode=INCREMENTAL;
set hoodie.tb_test_mor_par.consume.start.timestamp=20210420145330;
set hoodie.tb_test_mor_par.consume.max.commits=3;
select _hoodie_commit_time,primary_key,col0,col1,col2,col3,col4,col5,col6,col7 from testdb.tb_test_mor_par_rt where _hoodie_commit_time > '20210420145330' order by primary_key;
------------------------------------------------+
_hoodie_commit_time | primary_key | col0 | col1 | col6 | col7 |
------------------------------------------------+
20210420155738 | 20 | 77 | sC | 1587887604000000 | 739 |
20210420155738 | 21 | 66 | ps | 1609790497000000 | 61 |
20210420155738 | 22 | 47 | 1P | 1584600429000000 | 835 |
20210420155738 | 23 | 36 | 5K | 1607634808000000 | 538 |
20210420155738 | 24 | 1 | BA | 1606857113000000 | 775 |
20210420155738 | 24 | 101 | BA | 1606857113000000 | 775 |
20210420155738 | 24 | 100 | BA | 1606857113000000 | 775 |
20210420155738 | 24 | 102 | BA | 1606857113000000 | 775 |
------------------------------------------------+
the primary_key is repeated.
Attachments
Issue Links
- links to