Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-49016

Spark DataSet.isEmpty behaviour is different on CSV than JSON

    XMLWordPrintableJSON

Details

    Description

      Spark DataSet.isEmpty behaviour is different on CSV than JSON:

      • CSV → dataSet.isEmpty return the values for any query
      • JSON → dataSet.isEmpty throws error when filter is only corrupt_record is null:

      Tested version: Spark 3.4.3, Spark 3.5.1

      Expected behaviour: throw error on both file types or return the correct value

       

      In order to demonstrate the behaviour I added an unit test

       
      test.csv

      first,second,third

      test.json

      {"first": "first", "second": "second", "third": "third"}

      Code:

      import org.apache.spark.sql.Dataset;
      import org.apache.spark.sql.Row;
      import org.apache.spark.sql.SparkSession;
      import org.junit.jupiter.api.AfterEach;
      import org.junit.jupiter.api.BeforeEach;
      import org.junit.jupiter.api.Test;
      
      public class SparkIsEmptyTest {
      
          private SparkSession sparkSession;
      
          @BeforeEach
          void setUp() {
              sparkSession = getSpark();
          }
      
          @AfterEach
          void after() {
              sparkSession.close();
          }
      
          @Test
          void testDatasetIsEmptyForCsv() {
              var dataSet = runCsvQuery("select first, second, third, _corrupt_record from tempView where _corrupt_record is null");
      
              assert !dataSet.isEmpty();
          }
      
          @Test
          void testDatasetIsEmptyForJson() {
              var dataSet = runJsonQuery("select first, second, third, _corrupt_record from tempView where _corrupt_record is null");
      
              assert !dataSet.isEmpty();
          }
      
          @Test
          void testDatasetIsEmptyForJsonAnd1Eq1() {
              var dataSet = runJsonQuery(
                      "select first, second, third, _corrupt_record from tempView where _corrupt_record is null and 1=1");
      
              assert !dataSet.isEmpty();
          }
      
          @Test
          void testDatasetIsEmptyForCsvAnd1Eq1() {
              var dataSet = runCsvQuery(
                      "select first, second, third, _corrupt_record from tempView where _corrupt_record is null and 1=1");
      
              assert !dataSet.isEmpty();
          }
      
          @Test
          void testDatasetIsEmptyForJsonAndOtherCondition() {
             var dataSet = runJsonQuery("select first, second, third, _corrupt_record from tempView where _corrupt_record is null and first='first'");
      
              assert !dataSet.isEmpty();
          }
      
          @Test
          void testDatasetIsEmptyForCsvAndOtherCondition() {
              var dataSet = runCsvQuery("select first, second, third, _corrupt_record from tempView where _corrupt_record is null and first='first'");
      
              assert !dataSet.isEmpty();
          }
      
          @Test
          void testDatasetIsEmptyForJsonAggregation() {
              var dataSet = runJsonQuery("select count(1) from tempView where _corrupt_record is null");
      
              assert !dataSet.isEmpty();
          }
      
          @Test
          void testDatasetIsEmptyForCsvAggregation() {
              var dataSet = runCsvQuery("select count(1) from tempView where _corrupt_record is null");
      
              assert !dataSet.isEmpty();
          }
      
          @Test
          void testDatasetIsEmptyForJsonAggregationGroupBy() {
              var dataSet = runJsonQuery("select count(1) , first from tempView where _corrupt_record is null group by first");
      
              assert !dataSet.isEmpty();
          }
      
          @Test
          void testDatasetIsEmptyForCsvAggregationGroupBy() {
              var dataSet = runJsonQuery("select count(1) , first from tempView where _corrupt_record is null group by first");
      
              assert !dataSet.isEmpty();
          }
      
          private SparkSession getSpark() {
              return SparkSession.builder()
                      .master("local")
                      .appName("spark-dataset-isEmpty-issue")
                      .config("spark.ui.enabled", "false")
                      .getOrCreate();
          }
      
          private Dataset<?> runJsonQuery(String query) {
              Dataset<Row> dataset = sparkSession.read()
                      .schema("first STRING,second String, third STRING, _corrupt_record STRING")
                      .option("columnNameOfCorruptRecord", "_corrupt_record")
                      .json("test.json");
      
              dataset.createOrReplaceTempView("tempView");
              var dataSet = sparkSession.sql(query);
      
              dataSet.show();
      
              return dataSet;
          }
      
          private Dataset<?> runCsvQuery(String query) {
              Dataset<Row> dataset = sparkSession.read()
                      .schema("first STRING,second String, third STRING, _corrupt_record STRING")
                      .option("columnNameOfCorruptRecord", "_corrupt_record")
                      .csv("test.csv");
      
              dataset.createOrReplaceTempView("tempView");
              var dataSet = sparkSession.sql(query);
      
              dataSet.show();
      
              return dataSet;
          }
      }

      Result:

       

      Attachments

        1. image-2024-07-26-15-50-24-308.png
          65 kB
          Marius Butan
        2. image-2024-07-26-15-50-10-280.png
          58 kB
          Marius Butan

        Issue Links

          Activity

            People

              Wayne Guo Wei Guo
              marius.butan1992 Marius Butan
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: