Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
-
None
Description
We can project to add augmented fields like __filename, but there are a few catches. Given:
library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)
ds <- InMemoryDataset$create(mtcars) %>%
mutate(f = add_filename())
show_query(ds)
#> ExecPlan with 3 nodes:
#> 2:SinkNode{}
#> 1:ProjectNode{projection=[mpg, cyl, disp, hp, drat, wt, qsec, vs, am, gear, carb, "f": __filename]}
#> 0:SourceNode{}
collect(ds)
#> mpg cyl disp hp drat wt qsec vs am gear carb f
#> 1 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4 in-memory
#> 2 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4 in-memory
#> 3 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1 in-memory
...
Issue #1: you can't filter on that column because (my theory, based on the evidence) the ScanNode takes a projection and filter, but the filter is not evaluated with the augmented schema, so it doesn't find __filename. This seems fixable in C++.
ds %>% filter(f == "in-memory") %>% collect() #> Error in `collect()`: #> ! Invalid: No match for FieldRef.Name(__filename) in mpg: double #> cyl: double #> disp: double #> hp: double #> drat: double #> wt: double #> qsec: double #> vs: double #> am: double #> gear: double #> carb: double #> ℹ `add_filename()` or use of the `__filename` augmented field can only be used with with Dataset objects, and can only be added before doing an aggregation or a join. #> Backtrace: #> ▆ #> 1. ├─ds %>% filter(f == "in-memory") %>% collect() #> 2. ├─dplyr::collect(.) #> 3. └─arrow:::collect.arrow_dplyr_query(.) #> 4. └─base::tryCatch(...) #> 5. └─base (local) tryCatchList(expr, classes, parentenv, handlers) #> 6. └─base (local) tryCatchOne(expr, names, parentenv, handlers[[1L]]) #> 7. └─value[[3L]](cond) #> 8. └─arrow:::augment_io_error_msg(e, call, schema = x$.data$schema) #> 9. └─arrow:::handle_augmented_field_misuse(msg, call) #> 10. └─rlang::abort(msg, call = call)
Proof that it is in the ScanNode: If we collapse() the query after projecting to include filename but before the filter, the filter doesn't get included in the ScanNode, it's only applied after, as a FilterNode. This works:
ds %>%
collapse() %>%
filter(f == "in-memory") %>%
collect()
#> mpg cyl disp hp drat wt qsec vs am gear carb f
#> 1 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4 in-memory
#> 2 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4 in-memory
#> 3 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1 in-memory
...
A related failure mode: you have to first project to include the augmented column, you can't just include it in a filter:
InMemoryDataset$create(mtcars) %>% filter(add_filename() == "in-memory") %>% collect() #> Error in `collect()`: #> ! Invalid: No match for FieldRef.Name(__filename) in mpg: double #> cyl: double #> disp: double #> hp: double #> drat: double #> wt: double #> qsec: double #> vs: double #> am: double #> gear: double #> carb: double #> ℹ `add_filename()` or use of the `__filename` augmented field can only be used with with Dataset objects, and can only be added before doing an aggregation or a join. #> Backtrace: #> ▆ #> 1. ├─... %>% collect() #> 2. ├─dplyr::collect(.) #> 3. └─arrow:::collect.arrow_dplyr_query(.) #> 4. └─base::tryCatch(...) #> 5. └─base (local) tryCatchList(expr, classes, parentenv, handlers) #> 6. └─base (local) tryCatchOne(expr, names, parentenv, handlers[[1L]]) #> 7. └─value[[3L]](cond) #> 8. └─arrow:::augment_io_error_msg(e, call, schema = x$.data$schema) #> 9. └─arrow:::handle_augmented_field_misuse(msg, call) #> 10. └─rlang::abort(msg, call = call)
Issue #2, following on that: you can only add the augmented fields at the start of the query, something that goes in the ScanNode. This seems like something we would have to catch in R and error at the time add_filename() is called. That could probably be covered in ARROW-17356.
InMemoryDataset$create(mtcars) %>% collapse() %>% collapse() %>% filter(add_filename() == "in-memory") %>% collect() #> Error in `collect()`: #> ! Invalid: No match for FieldRef.Name(__filename) in mpg: double #> cyl: double #> disp: double #> hp: double #> drat: double #> wt: double #> qsec: double #> vs: double #> am: double #> gear: double #> carb: double #> ℹ `add_filename()` or use of the `__filename` augmented field can only be used with with Dataset objects, and can only be added before doing an aggregation or a join. #> Backtrace: #> ▆ #> 1. ├─... %>% collect() #> 2. ├─dplyr::collect(.) #> 3. └─arrow:::collect.arrow_dplyr_query(.) #> 4. └─base::tryCatch(...) #> 5. └─base (local) tryCatchList(expr, classes, parentenv, handlers) #> 6. └─base (local) tryCatchOne(expr, names, parentenv, handlers[[1L]]) #> 7. └─value[[3L]](cond) #> 8. └─arrow:::augment_io_error_msg(e, call, schema = x$.data$schema) #> 9. └─arrow:::handle_augmented_field_misuse(msg, call) #> 10. └─rlang::abort(msg, call = call)