Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.5.1, 3.4.3
Description
Spark DataSet.isEmpty behaviour is different on CSV than JSON:
- CSV → dataSet.isEmpty return the values for any query
- JSON → dataSet.isEmpty throws error when filter is only corrupt_record is null:
Tested version: Spark 3.4.3, Spark 3.5.1
Expected behaviour: throw error on both file types or return the correct value
In order to demonstrate the behaviour I added an unit test
test.csv
first,second,third
test.json
{"first": "first", "second": "second", "third": "third"}
Code:
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; public class SparkIsEmptyTest { private SparkSession sparkSession; @BeforeEach void setUp() { sparkSession = getSpark(); } @AfterEach void after() { sparkSession.close(); } @Test void testDatasetIsEmptyForCsv() { var dataSet = runCsvQuery("select first, second, third, _corrupt_record from tempView where _corrupt_record is null"); assert !dataSet.isEmpty(); } @Test void testDatasetIsEmptyForJson() { var dataSet = runJsonQuery("select first, second, third, _corrupt_record from tempView where _corrupt_record is null"); assert !dataSet.isEmpty(); } @Test void testDatasetIsEmptyForJsonAnd1Eq1() { var dataSet = runJsonQuery( "select first, second, third, _corrupt_record from tempView where _corrupt_record is null and 1=1"); assert !dataSet.isEmpty(); } @Test void testDatasetIsEmptyForCsvAnd1Eq1() { var dataSet = runCsvQuery( "select first, second, third, _corrupt_record from tempView where _corrupt_record is null and 1=1"); assert !dataSet.isEmpty(); } @Test void testDatasetIsEmptyForJsonAndOtherCondition() { var dataSet = runJsonQuery("select first, second, third, _corrupt_record from tempView where _corrupt_record is null and first='first'"); assert !dataSet.isEmpty(); } @Test void testDatasetIsEmptyForCsvAndOtherCondition() { var dataSet = runCsvQuery("select first, second, third, _corrupt_record from tempView where _corrupt_record is null and first='first'"); assert !dataSet.isEmpty(); } @Test void testDatasetIsEmptyForJsonAggregation() { var dataSet = runJsonQuery("select count(1) from tempView where _corrupt_record is null"); assert !dataSet.isEmpty(); } @Test void testDatasetIsEmptyForCsvAggregation() { var dataSet = runCsvQuery("select count(1) from tempView where _corrupt_record is null"); assert !dataSet.isEmpty(); } @Test void testDatasetIsEmptyForJsonAggregationGroupBy() { var dataSet = runJsonQuery("select count(1) , first from tempView where _corrupt_record is null group by first"); assert !dataSet.isEmpty(); } @Test void testDatasetIsEmptyForCsvAggregationGroupBy() { var dataSet = runJsonQuery("select count(1) , first from tempView where _corrupt_record is null group by first"); assert !dataSet.isEmpty(); } private SparkSession getSpark() { return SparkSession.builder() .master("local") .appName("spark-dataset-isEmpty-issue") .config("spark.ui.enabled", "false") .getOrCreate(); } private Dataset<?> runJsonQuery(String query) { Dataset<Row> dataset = sparkSession.read() .schema("first STRING,second String, third STRING, _corrupt_record STRING") .option("columnNameOfCorruptRecord", "_corrupt_record") .json("test.json"); dataset.createOrReplaceTempView("tempView"); var dataSet = sparkSession.sql(query); dataSet.show(); return dataSet; } private Dataset<?> runCsvQuery(String query) { Dataset<Row> dataset = sparkSession.read() .schema("first STRING,second String, third STRING, _corrupt_record STRING") .option("columnNameOfCorruptRecord", "_corrupt_record") .csv("test.csv"); dataset.createOrReplaceTempView("tempView"); var dataSet = sparkSession.sql(query); dataSet.show(); return dataSet; } }
Result:
Attachments
Attachments
Issue Links
- links to