Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
3.1.0
Description
Under certain circumstances, the V1 Avro reader drops rows. For example:
scala> val df = spark.range(0, 25).toDF("index") df: org.apache.spark.sql.DataFrame = [index: bigint] scala> df.write.mode("overwrite").format("avro").save("index_avro") scala> val loaded = spark.read.format("avro").load("index_avro") loaded: org.apache.spark.sql.DataFrame = [index: bigint] scala> loaded.collect.size res1: Int = 25 scala> loaded.orderBy("index").collect.size res2: Int = 17 <== expected 25 scala> loaded.orderBy("index").write.mode("overwrite").format("parquet").save("index_as_parquet") scala> spark.read.parquet("index_as_parquet").count res4: Long = 17 scala>
SPARK-32346 slightly refactored the AvroFileFormat and AvroPartitionReaderFactory to use a new iterator-like trait called AvroUtils#RowReader. RowReader#hasNextRow consumes a raw input record and stores the deserialized row for the next call to RowReader#nextRow. Unfortunately, sometimes hasNextRow is called twice before nextRow is called, resulting in a lost row (see BypassMergeSortShuffleWriter#write, which calls records.hasNext once before calling it again here).
RowReader consumes the Avro record in hasNextRow, rather than nextRow, because AvroDeserializer#deserialize potentially filters out the record.
Two possible fixes that I thought of:
1) keep state in RowReader such that multiple calls to RowReader#hasNextRow with no intervening call to RowReader#nextRow avoids consuming more than 1 Avro record. This requires no changes to any code that extends RowReader, just RowReader itself.
2) Move record consumption to RowReader#nextRow (such that RowReader#nextRow could potentially return None) and wrap any iterator that extends RowReader with a new iterator created by flatMap. This last iterator will filter out the Nones and extract rows from the Somes. This requires changes to AvroFileFormat and AvroPartitionReaderFactory as well as RowReader.
The first one seems simplest and most straightfoward, and doesn't require changes to AvroFileFormat and AvroPartitionReaderFactory, only to AvroUtils#RowReader. So I propose this.
Attachments
Issue Links
- is caused by
-
SPARK-32346 Support filters pushdown in Avro datasource
- Resolved
- links to