Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-21610

Corrupt records are not handled properly when creating a dataframe from a file

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.0.2, 2.2.0
    • Fix Version/s: 2.3.0
    • Component/s: SQL
    • Labels:
      None
    • Environment:

      macOs Sierra 10.12.5

      Description

      Consider a jsonl file with 3 records. The third record has a value of type string, instead of int.

      echo '{"field": 1}
      {"field": 2}
      {"field": "3"}' >/tmp/sample.json
      

      Create a dataframe from this file, with a schema that contains "_corrupt_record" so that corrupt records are kept.

      import org.apache.spark.sql.types._
      
      val schema = new StructType()
        .add("field", ByteType)
        .add("_corrupt_record", StringType)
      
      val file = "/tmp/sample.json"
      
      val dfFromFile = spark.read.schema(schema).json(file)
      

      Run the following lines from a spark-shell:

      scala> dfFromFile.show(false)
      +-----+---------------+
      |field|_corrupt_record|
      +-----+---------------+
      |1    |null           |
      |2    |null           |
      |null |{"field": "3"} |
      +-----+---------------+
      
      scala> dfFromFile.filter($"_corrupt_record".isNotNull).count()
      res1: Long = 0
      
      scala> dfFromFile.filter($"_corrupt_record".isNull).count()
      res2: Long = 3
      

      The expected result is 1 corrupt record and 2 valid records, but the actual one is 0 corrupt record and 3 valid records.

      The bug is not reproduced if we create a dataframe from a RDD:

      scala> val rdd = sc.textFile(file)
      rdd: org.apache.spark.rdd.RDD[String] = /tmp/sample.json MapPartitionsRDD[92] at textFile at <console>:28
      
      scala> val dfFromRdd = spark.read.schema(schema).json(rdd)
      dfFromRdd: org.apache.spark.sql.DataFrame = [field: tinyint, _corrupt_record: string]
      
      scala> dfFromRdd.show(false)
      +-----+---------------+
      |field|_corrupt_record|
      +-----+---------------+
      |1    |null           |
      |2    |null           |
      |null |{"field": "3"} |
      +-----+---------------+
      
      scala> dfFromRdd.filter($"_corrupt_record".isNotNull).count()
      res5: Long = 1
      
      scala> dfFromRdd.filter($"_corrupt_record".isNull).count()
      res6: Long = 2
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                cjm Jen-Ming Chung
                Reporter:
                dmt dmtran
              • Votes:
                0 Vote for this issue
                Watchers:
                7 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: