Description
Currently, if there are dots in the column name, predicate pushdown seems being failed in Parquet.
*With dots*
val path = "/tmp/abcde" Seq(Some(1), None).toDF("col.dots").write.parquet(path) spark.read.parquet(path).where("`col.dots` IS NOT NULL").show()
+--------+ |col.dots| +--------+ +--------+
*Without dots*
val path = "/tmp/abcde2" Seq(Some(1), None).toDF("coldots").write.parquet(path) spark.read.parquet(path).where("`coldots` IS NOT NULL").show()
+-------+ |coldots| +-------+ | 1| +-------+
It seems dot in the column names via FilterApi tries to separate the field name with dot (ColumnPath with multiple column paths) whereas the actual column name is col.dots. (See FilterApi.java#L71 and it calls ColumnPath.java#L44.
I just tried to come up with ways to resolve it and I came up with two as below:
One is simply to don't push down filters when there are dots in column names so that it reads all and filters in Spark-side.
The other way creates Spark's FilterApi for those columns (it seems final) to get always use single column path it in Spark-side (this seems hacky) as we are not pushing down nested columns currently. So, it looks we can get a field name via ColumnPath.get not ColumnPath.fromDotString in this way.
I just made a rough version of the latter.
private[parquet] object ParquetColumns { def intColumn(columnPath: String): Column[Integer] with SupportsLtGt = { new Column[Integer] (ColumnPath.get(columnPath), classOf[Integer]) with SupportsLtGt } def longColumn(columnPath: String): Column[java.lang.Long] with SupportsLtGt = { new Column[java.lang.Long] ( ColumnPath.get(columnPath), classOf[java.lang.Long]) with SupportsLtGt } def floatColumn(columnPath: String): Column[java.lang.Float] with SupportsLtGt = { new Column[java.lang.Float] ( ColumnPath.get(columnPath), classOf[java.lang.Float]) with SupportsLtGt } def doubleColumn(columnPath: String): Column[java.lang.Double] with SupportsLtGt = { new Column[java.lang.Double] ( ColumnPath.get(columnPath), classOf[java.lang.Double]) with SupportsLtGt } def booleanColumn(columnPath: String): Column[java.lang.Boolean] with SupportsEqNotEq = { new Column[java.lang.Boolean] ( ColumnPath.get(columnPath), classOf[java.lang.Boolean]) with SupportsEqNotEq } def binaryColumn(columnPath: String): Column[Binary] with SupportsLtGt = { new Column[Binary] (ColumnPath.get(columnPath), classOf[Binary]) with SupportsLtGt } }
Both sound not the best. Please let me know if anyone has a better idea.
Attachments
Issue Links
- is related to
-
PARQUET-389 Filter predicates should work with missing columns
- Resolved
- links to