Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Duplicate
-
None
-
None
Description
Github Issue - https://github.com/apache/hudi/issues/10590
Reproducible code
```
from typing import Any
from pyspark import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder \
.appName("Hudi Basics") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.jars.packages", "org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1") \
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
.config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
.getOrCreate()
sc = spark.sparkContext
table_name = "hudi_trips_cdc"
base_path = "/tmp/test_issue_10590_4" # Replace for whatever path
quickstart_utils = sc._jvm.org.apache.hudi.QuickstartUtils
dataGen = quickstart_utils.DataGenerator()
inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
def create_df():
df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
return df
def write_data():
df = create_df()
hudi_options =
df.write.format("hudi") \
.options(**hudi_options) \
.mode("overwrite") \
.save(base_path)
def update_data():
updates = quickstart_utils.convertToStringList(dataGen.generateUpdates(10))
df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write \
.format("hudi") \
.mode("append") \
.save(base_path)
def incremental_query():
ordered_rows: list[Row] = spark.read \
.format("hudi") \
.load(base_path) \
.select(col("_hoodie_commit_time").alias("commit_time")) \
.orderBy(col("commit_time")) \
.collect()
commits: list[Any] = list(map(lambda row: row[0], ordered_rows))
begin_time = commits[0]
incremental_read_options =
trips_incremental_df = spark.read \
.format("hudi") \
.options(**incremental_read_options) \
.load(base_path)
- Error also occurs when using the "from_hudi_table_changes" in 0.14.1
- sql_query = f""" SELECT * FROM hudi_table_changes ('{base_path}', 'cdc', 'earliest')"""
- trips_incremental_df = spark.sql(sql_query)
trips_incremental_df.show()
trips_incremental_df.printSchema()
if _name_ == "_main_":
write_data()
update_data()
incremental_query()
```
Attachments
Issue Links
- duplicates
-
HUDI-7383 CDC query failed due to dependency issue
- Closed