Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-49933

MERGE INTO does not push down the predicate of the WHEN NOT MATCHED BY SOURCE THEN DELETE clause (Iceberg)

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.5.2
    • None
    • Optimizer, SQL
    • None

    Description

      Description

      When writing to an Iceberg table using MERGE INTO command, and using WHEN NOT MATCHED BY SOURCE AND ... THEN DELETE clause, it is doing a full Iceberg table scan, even though the table is well partitioned. Without this clause, there is no full table scan.

      I’m using Spark 3.5.2 and Iceberg 1.6.0.

      An example

      Table schema is as such:
      CREATE TABLE iceberg_table (
      part_col_1 STRING,
      part_col_2 STRING,
      part_col_3 TIMESTAMP,
      some_col_x INT,
      some_col_y STRING,
      main_value_col BIGINT
      )
      USING iceberg
      PARTITIONED BY (part_col_1, part_col_2, days(part_col_3)){{}}
       
      MERGE INTO command is as follows:
      MERGE INTO iceberg_table t
      USING new_data_to_upsert s
      ON
      t.part_col_1 = 'some_part_value_a' AND
      t.part_col_2 = 'some_part_value_b' AND
      t.part_col_3 = s.part_col_3 AND
      t.some_col_x = s.some_col_x AND
      t.some_col_y = s.some_col_y
      WHEN MATCHED THEN UPDATE SET t.main_value_col = s.main_value_col
      WHEN NOT MATCHED BY TARGET THEN INSERT *
      WHEN NOT MATCHED BY SOURCE AND
      t.part_col_1 = 'some_part_value_a' AND
      t.part_col_2 = 'some_part_value_b' AND
      t.part_col_3 >= to_timestamp('2024-09-25 00:00:00') AND
      t.part_col_3 < to_timestamp('2024-09-25 04:00:00')
      THEN DELETE
       
      View "new_data_to_upsert" has the same columns as the table, and all its records have

      • part_col_1 equal "some_part_value_a",
      • part_col_2 equal "some_part_value_b",
      • part_col_3 in the range [2024-09-25 00:00:00; 2024-09-25 04:00:00).

      I also .persist() it before doing .createOrReplaceTempView("new_data_to_upsert").

      Query plan shows for the iceberg_table scan the following:
      (1) BatchScan iceberg_table
      Output [8]: some_col_x#511, part_col_3#512, part_col_1#513, part_col_2#514, some_col_y#515, main_value_col#516L, _file#520
      iceberg_table (branch=null) [filters=true, groupedBy=]{{}}
      As far as I understand filters=true is the problem - no predicate push-down.
      When I try a regular SELECT or DELETE, it shows smth like:
      filters=[part_col_1 = 'some_part_value_a', part_col_2 = 'some_part_value_b', part_col_3 >= 1727222400000, part_col_3 < 1727236800000]
       
      If I remove "WHEN NOT MATCHED BY SOURCE AND … THEN DELETE" clause from the query, the plan is different and the BatchScan steps to the Iceberg table do have filters specified. No matter what I've tried with the predicate in this clause, it never resulted in the predicate push-down.
       
      I've created an issue in the Iceberg GitHub repo as well, but nobody has responded so far: https://github.com/apache/iceberg/issues/11248

      Attachments

        Activity

          People

            Unassigned Unassigned
            visorgood Viacheslav Inozemtsev
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: