Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-4861

Relax MERGE INTO restrictions to permit casting of the matching condition

    XMLWordPrintableJSON

Details

    Description

      Reported by user:

      https://github.com/apache/hudi/issues/6626

       

      Following code

       

           target_df = spark.read.format("hudi").load(basePath)
          print("###################################")
          print(target_df.printSchema())
          # # target_df.show()
          target_datatype_map = {}
          for name, dtype in target_df.dtypes:
              target_datatype_map[name] = dtype
          print(str(target_datatype_map))
          print("###################################")
      
          for col in columns:
              if has_column(deflateDf, col):
                  deflateDf = deflateDf.withColumn(col, F.col(col))
              else:
                  deflateDf = deflateDf.withColumn(col, F.lit(None))
          deflateDf.createOrReplaceTempView("deflate_table")
          create_sql = "create table RESULTDATA using hudi location '/tmp/RESULTDATA_mor'"
          spark.sql(create_sql)
          
          merge_sql = """
          merge into RESULTDATA as target
              using (
                  select * from deflate_table as deflate
              )
              on target._context_id_ = deflate._context_id_ and target.id = deflate.id
              when matched
              then update set
              target.CREATED = cast(if(array_contains(deflate.changed_cols, 'CREATED'), deflate.CREATED, target.CREATED) as timestamp),target.CREATEDBY = cast(if(array_contains(deflate.changed_cols, 'CREATEDBY'), deflate.CREATEDBY, target.CREATEDBY) as string),target.DELETED = cast(if(array_contains(deflate.changed_cols, 'DELETED'), deflate.DELETED, target.DELETED) as timestamp),target.DELETEDBY = cast(if(array_contains(deflate.changed_cols, 'DELETEDBY'), deflate.DELETEDBY, target.DELETEDBY) as string),target.EXPIRATIONDATE = cast(if(array_contains(deflate.changed_cols, 'EXPIRATIONDATE'), deflate.EXPIRATIONDATE, target.EXPIRATIONDATE) as timestamp),target.ID = cast(if(array_contains(deflate.changed_cols, 'ID'), deflate.ID, target.ID) as decimal(12,0)),target.KEY = cast(if(array_contains(deflate.changed_cols, 'KEY'), deflate.KEY, target.KEY) as string),target.LASTMODIFIED = cast(if(array_contains(deflate.changed_cols, 'LASTMODIFIED'), deflate.LASTMODIFIED, target.LASTMODIFIED) as timestamp),target.LASTMODIFIEDBY = cast(if(array_contains(deflate.changed_cols, 'LASTMODIFIEDBY'), deflate.LASTMODIFIEDBY, target.LASTMODIFIEDBY) as string),target.ORDERING = cast(if(array_contains(deflate.changed_cols, 'ORDERING'), deflate.ORDERING, target.ORDERING) as decimal(12,0)),target.RESULTID = cast(if(array_contains(deflate.changed_cols, 'RESULTID'), deflate.RESULTID, target.RESULTID) as decimal(12,0)),target.REPORTINGPERIODTYPE = cast(if(array_contains(deflate.changed_cols, 'REPORTINGPERIODTYPE'), deflate.REPORTINGPERIODTYPE, target.REPORTINGPERIODTYPE) as string),target.RESULTDATE = cast(if(array_contains(deflate.changed_cols, 'RESULTDATE'), deflate.RESULTDATE, target.RESULTDATE) as timestamp),target.SATISFYINGNUMERATOR = cast(if(array_contains(deflate.changed_cols, 'SATISFYINGNUMERATOR'), deflate.SATISFYINGNUMERATOR, target.SATISFYINGNUMERATOR) as decimal(12,0)),target.VALUE = cast(if(array_contains(deflate.changed_cols, 'VALUE'), deflate.VALUE, target.VALUE) as string),target._ETL_RUN_ID_ = cast(if(array_contains(deflate.changed_cols, '_ETL_RUN_ID_'), deflate._ETL_RUN_ID_, target._ETL_RUN_ID_) as decimal(38,0)),target._ETL_MODIFIED_ = cast(if(array_contains(deflate.changed_cols, '_ETL_MODIFIED_'), deflate._ETL_MODIFIED_, target._ETL_MODIFIED_) as timestamp),target._EXTRACTED_ = cast(if(array_contains(deflate.changed_cols, '_EXTRACTED_'), deflate._EXTRACTED_, target._EXTRACTED_) as timestamp),target._SOURCE_EXTRACTED_ = cast(if(array_contains(deflate.changed_cols, '_SOURCE_EXTRACTED_'), deflate._SOURCE_EXTRACTED_, target._SOURCE_EXTRACTED_) as timestamp),target._LAST_MODIFIED_SEQ_ = cast(if(array_contains(deflate.changed_cols, '_LAST_MODIFIED_SEQ_'), deflate._LAST_MODIFIED_SEQ_, target._LAST_MODIFIED_SEQ_) as decimal(38,0)),target._SCHEMA_CLASS_ = cast(if(array_contains(deflate.changed_cols, '_SCHEMA_CLASS_'), deflate._SCHEMA_CLASS_, target._SCHEMA_CLASS_) as string),target._CONTEXT_ID_ = cast(if(array_contains(deflate.changed_cols, '_CONTEXT_ID_'), deflate._CONTEXT_ID_, target._CONTEXT_ID_) as decimal(12,0)),target._IS_DELETED_ = cast(if(array_contains(deflate.changed_cols, '_IS_DELETED_'), deflate._IS_DELETED_, target._IS_DELETED_) as boolean)
              when not matched
              then insert
              (CREATED,CREATEDBY,DELETED,DELETEDBY,EXPIRATIONDATE,ID,KEY,LASTMODIFIED,LASTMODIFIEDBY,ORDERING,RESULTID,REPORTINGPERIODTYPE,RESULTDATE,SATISFYINGNUMERATOR,VALUE,_ETL_RUN_ID_,_ETL_MODIFIED_,_EXTRACTED_,_SOURCE_EXTRACTED_,_LAST_MODIFIED_SEQ_,_SCHEMA_CLASS_,_CONTEXT_ID_,_IS_DELETED_) values (cast(deflate.CREATED as timestamp),cast(deflate.CREATEDBY as string),cast(deflate.DELETED as timestamp),cast(deflate.DELETEDBY as string),cast(deflate.EXPIRATIONDATE as timestamp),cast(deflate.ID as decimal(12,0)),cast(deflate.KEY as string),cast(deflate.LASTMODIFIED as timestamp),cast(deflate.LASTMODIFIEDBY as string),cast(deflate.ORDERING as decimal(12,0)),cast(deflate.RESULTID as decimal(12,0)),cast(deflate.REPORTINGPERIODTYPE as string),cast(deflate.RESULTDATE as timestamp),cast(deflate.SATISFYINGNUMERATOR as decimal(12,0)),cast(deflate.VALUE as string),cast(deflate._ETL_RUN_ID_ as decimal(38,0)),cast(deflate._ETL_MODIFIED_ as timestamp),cast(deflate._EXTRACTED_ as timestamp),cast(deflate._SOURCE_EXTRACTED_ as timestamp),cast(deflate._LAST_MODIFIED_SEQ_ as decimal(38,0)),cast(deflate._SCHEMA_CLASS_ as string),cast(deflate._CONTEXT_ID_ as decimal(12,0)),cast(deflate._IS_DELETED_ as boolean))
              """
          spark.sql(merge_sql) 

       

       

      Results in the exception being thrown:

       

      /09/07 18:47:12 INFO HoodieTableMetaClient: Finished Loading Table of type MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from file:///tmp/RESULTDATA_mor
      22/09/07 18:47:12 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[20220907150126010__deltacommit__COMPLETED]}
      Traceback (most recent call last):
        File "/Users/parunkarthick/cdc-poc/main.py", line 971, in <module>
          process_table(deflate_df, tableName, table_cols[tableNames[0]], concurrent_write_enabled, delete_insert_enabled)
        File "/Users/parunkarthick/cdc-poc/main.py", line 767, in process_table
          merge_into_hudi(table_name, df, table_cols)
        File "/Users/parunkarthick/cdc-poc/main.py", line 599, in merge_into_hudi
          target_rows = spark.sql(sql)
        File "/opt/homebrew/Cellar/apache-spark/3.1.2/libexec/python/lib/pyspark.zip/pyspark/sql/session.py", line 723, in sql
        File "/opt/homebrew/Cellar/apache-spark/3.1.2/libexec/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
        File "/opt/homebrew/Cellar/apache-spark/3.1.2/libexec/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
      pyspark.sql.utils.AnalysisException: Invalidate Merge-On condition: (CAST(target.`id` AS DECIMAL(20,0)) = CAST(CAST(deflate.`id` AS DECIMAL(20,0)) AS DECIMAL(20,0))).The validate condition should be 'targetColumn = sourceColumnExpression', e.g. t.id = s.id and t.dt = from_unixtime(s.ts) 

       

      This occurs due to the fact that current impl of MergeIntoHoodieTableCommand restricts target table's primary key to be just an AttributeReference, which in this case is wrapped into a Cast

       

      Attachments

        Issue Links

          Activity

            People

              alexey.kudinkin Alexey Kudinkin
              alexey.kudinkin Alexey Kudinkin
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: