Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27375

cache not working after discretizer.fit(df).transform(df)

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Cannot Reproduce
    • Affects Version/s: 2.3.0
    • Fix Version/s: None
    • Component/s: Examples
    • Labels:
      None

      Description

      Below gives an example.

      If cache works, col(r1) should be equal to col(r2) in the output dfj.show(). However, after using discretizer fit and transform DF, col(r1) and col(r2) are different.

       

      spark.catalog.clearCache()
      import random
      random.seed(123)
      
      @udf(IntegerType())
      def ri():
          return random.choice([1,2,3,4,5,6,7,8,9])
      
      df = spark.range(100).repartition("id")
      
      #remove discretizer part, col(r1) will be equal to col(r2)
      discretizer = QuantileDiscretizer(numBuckets=3, inputCol="id", outputCol="quantileNo") 
      df = discretizer.fit(df).transform(df)
      
      # if we add following 1 line copy df, col(r1) will also become equal to col(r2)
      # df = df.rdd.toDF()
      
      
      df = df.withColumn("r", ri()).cache()
      df1 = df.withColumnRenamed("r", "r1")
      df2 = df.withColumnRenamed("r", "r2")
      
      df1.join(df2, "id").explain()
      dfj = df1.join(df2, "id")
      dfj.select("id", "r1", "r2").show(5)
      
      
       
      
      The result is shown as below, we see that col(r1) and col(r2) are different. 
      The physical plan shows that the cache() is missed in join operation. 
      
      To avoid it, I either add df = df.rdd.toDF() before creating df1 and df2, or if we remove discretizer fit and transformation, col(r1) and col(r2) become identical. 
      
       
      
      == Physical Plan ==
      *(4) Project [id#15612L, quantileNo#15622, r1#15645, quantileNo#15653, r2#15649]
      +- *(4) BroadcastHashJoin [id#15612L], [id#15655L], Inner, BuildRight
       :- *(4) Project [id#15612L, UDF:bucketizer_0(cast(id#15612L as double)) AS quantileNo#15622, pythonUDF0#15661 AS r1#15645]
       : +- BatchEvalPython [ri()], [id#15612L, pythonUDF0#15661]
       : +- Exchange hashpartitioning(id#15612L, 24)
       : +- *(1) Range (0, 100, step=1, splits=6)
       +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
       +- *(3) Project [id#15655L, UDF:bucketizer_0(cast(id#15655L as double)) AS quantileNo#15653, pythonUDF0#15662 AS r2#15649]
       +- BatchEvalPython [ri()], [id#15655L, pythonUDF0#15662]
       +- ReusedExchange [id#15655L], Exchange hashpartitioning(id#15612L, 24)
      +---+---+---+
      | id| r1| r2|
      +---+---+---+
      | 28| 9| 3|
      | 30| 3| 6|
      | 88| 1| 9|
      | 67| 3| 3|
      | 66| 1| 5|
      +---+---+---+
      only showing top 5 rows
      
       
      
      

       

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              lzhenyi Zhenyi Lin
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: