Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-21610

Corrupt records are not handled properly when creating a dataframe from a file

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.0.2, 2.2.0
    • 2.3.0
    • SQL
    • None
    • macOs Sierra 10.12.5

    Description

      Consider a jsonl file with 3 records. The third record has a value of type string, instead of int.

      echo '{"field": 1}
      {"field": 2}
      {"field": "3"}' >/tmp/sample.json
      

      Create a dataframe from this file, with a schema that contains "_corrupt_record" so that corrupt records are kept.

      import org.apache.spark.sql.types._
      
      val schema = new StructType()
        .add("field", ByteType)
        .add("_corrupt_record", StringType)
      
      val file = "/tmp/sample.json"
      
      val dfFromFile = spark.read.schema(schema).json(file)
      

      Run the following lines from a spark-shell:

      scala> dfFromFile.show(false)
      +-----+---------------+
      |field|_corrupt_record|
      +-----+---------------+
      |1    |null           |
      |2    |null           |
      |null |{"field": "3"} |
      +-----+---------------+
      
      scala> dfFromFile.filter($"_corrupt_record".isNotNull).count()
      res1: Long = 0
      
      scala> dfFromFile.filter($"_corrupt_record".isNull).count()
      res2: Long = 3
      

      The expected result is 1 corrupt record and 2 valid records, but the actual one is 0 corrupt record and 3 valid records.

      The bug is not reproduced if we create a dataframe from a RDD:

      scala> val rdd = sc.textFile(file)
      rdd: org.apache.spark.rdd.RDD[String] = /tmp/sample.json MapPartitionsRDD[92] at textFile at <console>:28
      
      scala> val dfFromRdd = spark.read.schema(schema).json(rdd)
      dfFromRdd: org.apache.spark.sql.DataFrame = [field: tinyint, _corrupt_record: string]
      
      scala> dfFromRdd.show(false)
      +-----+---------------+
      |field|_corrupt_record|
      +-----+---------------+
      |1    |null           |
      |2    |null           |
      |null |{"field": "3"} |
      +-----+---------------+
      
      scala> dfFromRdd.filter($"_corrupt_record".isNotNull).count()
      res5: Long = 1
      
      scala> dfFromRdd.filter($"_corrupt_record".isNull).count()
      res6: Long = 2
      

      Attachments

        Issue Links

          Activity

            People

              cjm Jen-Ming Chung
              dmt dmtran
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: