Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-7360

Incremental CDC Query after 0.14.1 upgrade giving Jackson class incompatibility exception

    XMLWordPrintableJSON

Details

    Description

      Github Issue - https://github.com/apache/hudi/issues/10590

      Reproducible code 

      ```
      from typing import Any

      from pyspark import Row
      from pyspark.sql import SparkSession
      from pyspark.sql.functions import col

      spark = SparkSession.builder \
      .appName("Hudi Basics") \
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
      .config("spark.jars.packages", "org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1") \
      .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
      .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
      .getOrCreate()

      sc = spark.sparkContext

      table_name = "hudi_trips_cdc"
      base_path = "/tmp/test_issue_10590_4" # Replace for whatever path
      quickstart_utils = sc._jvm.org.apache.hudi.QuickstartUtils
      dataGen = quickstart_utils.DataGenerator()

      inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))

      def create_df():
      df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
      return df

      def write_data():
      df = create_df()
      hudi_options =

      { "hoodie.table.name": table_name, "hoodie.datasource.write.recordkey.field": "uuid", "hoodie.datasource.write.table.type": "MERGE_ON_READ", # This can be either MoR or CoW and the error will still happen "hoodie.datasource.write.partitionpath.field": "partitionpath", "hoodie.datasource.write.table.name": table_name, "hoodie.datasource.write.operation": "upsert", "hoodie.table.cdc.enabled": "true", # This can be left enabled, and won"t affect anything unless actually queried as CDC "hoodie.datasource.write.precombine.field": "ts", "hoodie.upsert.shuffle.parallelism": 2, "hoodie.insert.shuffle.parallelism": 2 }

      df.write.format("hudi") \
      .options(**hudi_options) \
      .mode("overwrite") \
      .save(base_path)

      def update_data():
      updates = quickstart_utils.convertToStringList(dataGen.generateUpdates(10))
      df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
      df.write \
      .format("hudi") \
      .mode("append") \
      .save(base_path)

      def incremental_query():
      ordered_rows: list[Row] = spark.read \
      .format("hudi") \
      .load(base_path) \
      .select(col("_hoodie_commit_time").alias("commit_time")) \
      .orderBy(col("commit_time")) \
      .collect()
      commits: list[Any] = list(map(lambda row: row[0], ordered_rows))
      begin_time = commits[0]
      incremental_read_options =

      { 'hoodie.datasource.query.incremental.format': "cdc", # Uncomment this line to Query as CDC, crashes in 0.14.1 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime': begin_time, }

      trips_incremental_df = spark.read \
      .format("hudi") \
      .options(**incremental_read_options) \
      .load(base_path)

      1. Error also occurs when using the "from_hudi_table_changes" in 0.14.1
      2. sql_query = f""" SELECT * FROM hudi_table_changes ('{base_path}', 'cdc', 'earliest')"""
      3. trips_incremental_df = spark.sql(sql_query)
        trips_incremental_df.show()
        trips_incremental_df.printSchema()

      if _name_ == "_main_":
      write_data()
      update_data()
      incremental_query()
      ```
       

       

       

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              adityagoenka Aditya Goenka
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: