Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-12052

DataFrame with self-join fails unless toDF() column aliases provided

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 1.5.1, 1.5.2, 1.6.0
    • None
    • SQL
    • spark-shell for Spark 1.5.1, 1.5.2, and 1.6.0-Preview2

    Description

      Joining with the same DF twice appears to match on the wrong column unless the columns in the results of the first join are aliased with "toDF". Here is an example program:

      val rdd = sc.parallelize(2 to 100, 1).cache
      val numbers = rdd.map(i => (i, i*i)).toDF("n", "nsq")
      val names   = rdd.map(i => (i, i.toString)).toDF("id", "name")
      numbers.show
      names.show
      
      val good = numbers.
        join(names, numbers("n") === names("id")).toDF("n", "nsq", "id1", "name1").
        join(names, $"nsq" === names("id")).toDF("n", "nsq", "id1", "name1", "id2", "name2")
      
      // The last toDF can be omitted and you'll still get valid results.
      
      good.printSchema
      // root
      //  |-- i: integer (nullable = false)
      //  |-- isq: integer (nullable = false)
      //  |-- i1: integer (nullable = false)
      //  |-- name1: string (nullable = true)
      //  |-- i2: integer (nullable = false)
      //  |-- name2: string (nullable = true)
      
      good.count
      // res3: Long = 9
      
      good.show
      // +---+---+---+-----+---+-----+
      // |  n|nsq|id1|name1|id2|name2|
      // +---+---+---+-----+---+-----+
      // |  2|  4|  2|    2|  4|    4|
      // |  4| 16|  4|    4| 16|   16|
      // |  6| 36|  6|    6| 36|   36|
      // |  8| 64|  8|    8| 64|   64|
      // | 10|100| 10|   10|100|  100|
      // |  3|  9|  3|    3|  9|    9|
      // |  5| 25|  5|    5| 25|   25|
      // |  7| 49|  7|    7| 49|   49|
      // |  9| 81|  9|    9| 81|   81|
      // +---+---+---+-----+---+-----+
      
      val bad = numbers.
        join(names, numbers("n") === names("id")).
        join(names, $"nsq" === names("id"))
      
      bad.printSchema
      // root
      //  |-- n: integer (nullable = false)
      //  |-- nsq: integer (nullable = false)
      //  |-- id: integer (nullable = false)
      //  |-- name: string (nullable = true)
      //  |-- id: integer (nullable = false)
      //  |-- name: string (nullable = true)
      
      bad.count
      // res6: Long = 0
      
      bad.show
      // +---+---+---+----+---+----+
      // |  n|nsq| id|name| id|name|
      // +---+---+---+----+---+----+
      // +---+---+---+----+---+----+
      
      // Curiosly, if you change the original rdd line to this:
      //   val rdd = sc.parallelize(2 to 100, 1).cache
      // The first record is for numbers is (1,1). Then bad will have the following
      // content:
      // +---+---+---+----+---+----+
      // |  n|nsq| id|name| id|name|
      // +---+---+---+----+---+----+
      // |  1|  1|  1|   1|  1|   1|
      // |  1|  1|  1|   1|  2|   2|
      // |  1|  1|  1|   1|  3|   3|
      // |  1|  1|  1|   1|  4|   4|
      // |  1|  1|  1|   1|  5|   5|
      // |  1|  1|  1|   1|  6|   6|
      // |  1|  1|  1|   1|  7|   7|
      // |  1|  1|  1|   1|  8|   8|
      // |  1|  1|  1|   1|  9|   9|
      // |  1|  1|  1|   1| 10|  10|
      // |  1|  1|  1|   1| 11|  11|
      // |  1|  1|  1|   1| 12|  12|
      // |  1|  1|  1|   1| 13|  13|
      // |  1|  1|  1|   1| 14|  14|
      // |  1|  1|  1|   1| 15|  15|
      // |  1|  1|  1|   1| 16|  16|
      // |  1|  1|  1|   1| 17|  17|
      // |  1|  1|  1|   1| 18|  18|
      // |  1|  1|  1|   1| 19|  19|
      // |  1|  1|  1|   1| 20|  20|
      // ...
      // |  1|  1|  1|   1| 96|  96|
      // |  1|  1|  1|   1| 97|  97|
      // |  1|  1|  1|   1| 98|  98|
      // |  1|  1|  1|   1| 99|  99|
      // |  1|  1|  1|   1|100| 100|
      // +---+---+---+----+---+----+
      //
      // This make no sense to me.
      
      // Breaking it up, so we can reference 'bad2("nsq")' doesn't help:
      val bad2 = numbers.
        join(names, numbers("n") === names("id"))
      val bad3 = bad2.
        join(names, bad2("nsq") === names("id"))
      
      bad3.printSchema
      bad3.count
      bad3.show
      

      Note the embedded comment that if you start with 1 to 100, you get a record in numbers with two 1 values. This yields the strange results shown in the comment, suggesting that the join was actually done on the wrong column of the first result set. However, the output actually makes no sense; based on the results you get from the first join alone, it's "impossible" to get this output!

      Note: Could be related to the following issues:

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              deanwampler Dean Wampler
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: