Details
-
Bug
-
Status: Open
-
Minor
-
Resolution: Unresolved
-
3.3.1
-
None
-
We are using Azure Synapse Analytics. Within that, we have provisioned a Spark Pool with 101 nodes. 100 nodes are used for the executors and 1 node is for the driver. Each node is what Synapse Analytics calls a "Memory Optimized Medium size node". This means each node has 8 vCores and 64 GB memory. The Spark Pool does not do dynamic allocation of executors (101 nodes are created at the start and present throughout the Spark job). Synapse has something called "Intelligent Cache," but we disabled it (set to 0%). The nodes all use Spark 3.3.1.5.2-90111858. If you need details on any specific Spark settings, I can get that for you. Mostly we are just using the defaults.
We are using Azure Synapse Analytics. Within that, we have provisioned a Spark Pool with 101 nodes. 100 nodes are used for the executors and 1 node is for the driver. Each node is what Synapse Analytics calls a "Memory Optimized Medium size node". This means each node has 8 vCores and 64 GB memory. The Spark Pool does not do dynamic allocation of executors (101 nodes are created at the start and present throughout the Spark job). Synapse has something called "Intelligent Cache," but we disabled it (set to 0%). The nodes all use Spark 3.3.1.5.2-90111858. If you need details on any specific Spark settings, I can get that for you. Mostly we are just using the defaults.
Description
Bug Context
Hello! I would like to report a bug that my team noticed while we were using Spark (please see the Environment section to see our exact setup).
The application we built is meant to convert a large number of JSON files (JSON Lines format) and write them to a Delta table. The JSON files are located in an Azure Data Lake Gen 2 without hierarchical namespacing. The Delta table is in an Azure Data Lake Gen 2 with hierarchical namespacing.
We have a PySpark notebook in our Synapse Analytics workspace which reads the JSON files into a DataFrame and then writes them to the Delta table. It uses batch processing.
The JSON files have no corrupt records, we checked them thoroughly. And there are no code flaws in our PySpark notebook, we also checked that.
Our code reads 15 TB of JSON files (each file is about 400 MB in size) into our PySpark DataFrame in the following way.
originalDF = ( spark.read .schema(originDataSchema) .option("pathGlobFilter", DESIRED_FILE_PATTERN) .option("mode", "PERMISSIVE") .option("columnNameOfCorruptRecord", "DiscoveredCorruptRecords") .option("badRecordsPath", BAD_RECORDS_PATH) .json(ORIGIN_FILES_PATH) )
To read this data and then write it to a Delta table takes about 37 minutes.
The problem that we noticed is that as the data is read into the PySpark DataFrame, a small percent of it becomes corrupted. Only about 1 in 10 million records become corrupted. This is just a made-up example to illustrate the point:
// The original JSON record looks like this { "Name": "Robert", "Email": "bob@gmail.com", "Nickname": "Bob" } // When we look in the PySpark DataFrame we see this (for a small percent of records) { "Name": "Robertbob@", "Email": "gmail.com", "Nickname": "Bob" }
Essentially, the spark.read() has some deserialization problem that only emerges for high data throughput (> 20 TB/hr).
When we tried using a smaller dataset (1/4 the size), it didn't show any signs of corruption.
When we use the same exact code and then parse just one JSON file which contains the record mentioned above, everything works perfectly fine.
The spark.read() corruption is also not deterministic. If we re-run the 20 TB/hr test, we still see corruption but in different records.
Our Temporary Solution
What we noticed is that the "spark.sql.files.maxPartitionBytes" was by default set to 128 MB. This meant that for the average JSON files we were reading - which was 400 MB - Spark was making four calls to the Azure Data Lake and fetching a byte range (i.e. the 1st call got bytes 0-128MB, the 2nd call got bytes 128MB-256MB, etc.).
We increased "spark.sql.files.maxPartitionBytes" to a large number (1 GB) and that made the data corruption problem go away.
How We Think You Can Fix This
From my understanding, when Spark makes a call for a byte range, it will often "cut off" the data in the middle of a JSON record. Our JSON files are in the JSON Lines format and they contain thousands of lines, each with a JSON record. So calling a byte range from 0 - 128MB will most likely mean that the cutoff point is right in the middle of a JSON record.
Spark seems to have some code logic which handles this by only processing the "full lines" that are received. But this logic seems to be failing a small percent of the time. Specifically, we have about 50,000 JSON files, that means ~200,000 byte range calls are being made. And spark.read() is creating about 150 corrupt records.
So we think you should look at the Spark code which is doing this "cut off" handling for byte ranges and see if there's something missing there. Or something in the deserialization logic of spark.read().
Again, this bug only emerges for high volumes of data transfer (> 20 TB/hr). This could be a "race condition" or some kind of performance-related bug.