Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-17061

Incorrect results returned following a join of two datasets and a map step where total number of columns >100

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 2.0.0, 2.0.1
    • 2.0.1, 2.1.0
    • Spark Core

    Description

      We have hit a consistent bug where we have a dataset with more than 100 columns. I am raising as a blocker because spark is returning the WRONG results rather than erroring, leading to data integrity issues

      I have put together the following test case which will show the issue (it will run in spark-shell). In this example i am joining a dataset with lots of fields onto another dataset.

      The join works fine and if you show the dataset you will get the expected result. However if you run a map step over the dataset you end up with a strange error where the sequence that is in the right dataset now only contains the last value.

      Whilst this test may seem a rather contrived example, what we are doing here is a very standard analtical pattern. My original code was designed to:

      • take a dataset of child records
      • groupByKey up to the parent: giving a Dataset of (ParentID, Seq[Children])
      • join the children onto the parent by parentID: giving ((Parent),(ParentID,Seq[Children])
      • map over the result to give a tuple of (Parent,Seq[Children])

      Notes:

      • The issue is resolved by having less fields - as soon as we go <= 100 the integrity issue goes away. Try removing one of the fields from BigCaseClass below
      • The issue will arise based on the total number of fields in the resulting dataset. Below i have a small case class and a big case class, but two case classes of 50 variable would give the same issue
      • the issue occurs where the case class being joined on (on the right) has a case class type. It doesnt occur if you have a Seq[String]
      • If i go back to an RDD for the map step after the join i can workaround the issue, but i lose all the benefits of datasets

      Scala code test case:

      case class Name(name: String)
      case class SmallCaseClass (joinkey: Integer, names: Seq[Name])
      case class BigCaseClass (field1: Integer,field2: Integer,field3: Integer,field4: Integer,field5: Integer,field6: Integer,field7: Integer,field8: Integer,field9: Integer,field10: Integer,field11: Integer,field12: Integer,field13: Integer,field14: Integer,field15: Integer,field16: Integer,field17: Integer,field18: Integer,field19: Integer,field20: Integer,field21: Integer,field22: Integer,field23: Integer,field24: Integer,field25: Integer,field26: Integer,field27: Integer,field28: Integer,field29: Integer,field30: Integer,field31: Integer,field32: Integer,field33: Integer,field34: Integer,field35: Integer,field36: Integer,field37: Integer,field38: Integer,field39: Integer,field40: Integer,field41: Integer,field42: Integer,field43: Integer,field44: Integer,field45: Integer,field46: Integer,field47: Integer,field48: Integer,field49: Integer,field50: Integer,field51: Integer,field52: Integer,field53: Integer,field54: Integer,field55: Integer,field56: Integer,field57: Integer,field58: Integer,field59: Integer,field60: Integer,field61: Integer,field62: Integer,field63: Integer,field64: Integer,field65: Integer,field66: Integer,field67: Integer,field68: Integer,field69: Integer,field70: Integer,field71: Integer,field72: Integer,field73: Integer,field74: Integer,field75: Integer,field76: Integer,field77: Integer,field78: Integer,field79: Integer,field80: Integer,field81: Integer,field82: Integer,field83: Integer,field84: Integer,field85: Integer,field86: Integer,field87: Integer,field88: Integer,field89: Integer,field90: Integer,field91: Integer,field92: Integer,field93: Integer,field94: Integer,field95: Integer,field96: Integer,field97: Integer,field98: Integer,field99: Integer)

      val bigCC=Seq(BigCaseClass(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99))

      val smallCC=Seq(SmallCaseClass(1,Seq(
      Name("Jamie"),
      Name("Ian"),
      Name("Dave"),
      Name("Will")
      )))

      val bigCCDS = spark.createDataset(spark.sparkContext.parallelize(bigCC))
      val smallCCDS = spark.createDataset(spark.sparkContext.parallelize(smallCC))

      val joined_test=bigCCDS.as("A").joinWith(smallCCDS.as("B"), $"A.field1"===$"B.joinkey", "LEFT")

      /*This next step is fine - it shows all 4 names:

      • [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99]
      • [1,WrappedArray([Jamie], [Ian], [Dave], [Will])]
      • */
        joined_test.show(false)

      /*This one ends up repeating will - I did the most simple map step possible here

      • [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99]
      • [1,WrappedArray([Will], [Will], [Will], [Will])]
      • */
        joined_test.map(identity).show(false)

      /*This one works because we have less than 100 fields:

      • [Jamie], [Ian], [Dave], [Will]*/
        joined_test.map(_._2).show(false)

      Attachments

        Issue Links

          Activity

            People

              proflin Liwei Lin(Inactive)
              jamiehutton Jamie Hutton
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: