Uploaded image for project: 'Apache Arrow'
  1. Apache Arrow
  2. ARROW-15069

[R] open_dataset very slow on heavily partitioned parquet dataset

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • 6.0.1
    • None
    • R
    • None
    • macOS Mojave, R 4.1.1

    Description

      Opening a (particular) partitioned hive-style parquet dataset is very slow (45s to 1 minute).  I have a reproducible example below that takes 780 csv files and writes them to parquet using the open_dataset("csv files") |> group_by(vars) |> write_dataset("parquet") suggested here. Opening and querying the subsequent parquet dataset is much slower than doing it on the original csv files, which is not what I expected.

      library(arrow)
      library(dplyr)
      library(tictoc)
      
      zipfile <- "ahccd.zip"
      csv_dir <- "data/csv"
      parquet_dir <- "data/parquet"
      
      dir.create(csv_dir, recursive = TRUE)
      dir.create(parquet_dir, recursive = TRUE)
      
      # A zip of 780 csvs of daily temperature data at Canadian climate stations (one file per station)
      download.file("https://www.dropbox.com/s/f0a18jp0lvbp1hp/ahccd.zip?dl=1", destfile = zipfile)
      
      unzip(zipfile, exdir = csv_dir)
      
      csv_schema <- schema(
        field("stn_id", string()),
        field("stn_name", string()),
        field("measure", string()),
        field("date", date32()),
        field("year", int64()),
        field("month", int64()),
        field("temp", double()),
        field("province", string()),
        field("stn_joined", string()),
        field("element", string()),
        field("unit", string()),
        field("stn_last_updated", string()),
        field("flag", string())
      )
      
      csv_format <- FileFormat$create(format = "csv", quoting = FALSE)
      
      # Write to parquet, partitioning on stn_id, year, measure
      tic("write csv to parquet")
      arrow::open_dataset("data/csv", schema = csv_schema,
                          format = csv_format) |>
        group_by(stn_id, year, measure) |>
        write_dataset(parquet_dir, format = "parquet")
      toc()
      #> write csv to parquet: 2067.093 sec elapsed
      
      stations <- c("1100031", "1100120", "1100119", "1036B06")
      
      ## Query directory of original csv files
      tic("query csv")
      foo <- arrow::open_dataset(csv_dir, schema = csv_schema,
                                 format = csv_format) |>
        filter(year >= 1990,
               year <= 2020,
               stn_id %in% stations,
               measure == "daily_max") |>
        collect()
      toc()
      #> query csv: 12.571 sec elapsed
      
      ## Query the hive-style parquet directory
      tic("query parquet")
      bar <- arrow::open_dataset("data/parquet") |>
        filter(year >= 1990,
               year <= 2020,
               stn_id %in% stations,
               measure == "daily_max") |>
        collect()
      toc()
      #> query parquet: 41.79 sec elapsed
      
      ## It turns out that it is just the opening of the dataset 
      ## that takes so long
      tic("open parquet dataset")
      ds <- arrow::open_dataset("~/Desktop/arrow-report/data/parquet")
      toc()
      #> open parquet dataset: 45.581 sec elapsed
      
      ds
      #> FileSystemDataset with 191171 Parquet files
      #> stn_name: string
      #> date: date32[day]
      #> month: int64
      #> temp: double
      #> province: string
      #> stn_joined: string
      #> element: string
      #> unit: string
      #> stn_last_updated: string
      #> flag: string
      #> stn_id: string
      #> year: int32
      #> measure: string
      
      tic("query already openend dataset")
      ds |> 
        filter(year >= 1990,
               year <= 2020,
               stn_id %in% stations,
               measure == "daily_max") |>
        collect()
      #> # A tibble: 44,469 × 13
      #>    stn_name date       month  temp province stn_joined     element        unit  
      #>    <chr>    <date>     <int> <dbl> <chr>    <chr>          <chr>          <chr> 
      #>  1 ALBERNI  1992-01-01     1   6.5 BC       station joined Homogenized d… Deg C…
      #>  2 ALBERNI  1992-01-02     1   5.5 BC       station joined Homogenized d… Deg C…
      #>  3 ALBERNI  1992-01-03     1   3.5 BC       station joined Homogenized d… Deg C…
      #>  4 ALBERNI  1992-01-04     1   6   BC       station joined Homogenized d… Deg C…
      #>  5 ALBERNI  1992-01-05     1   0.5 BC       station joined Homogenized d… Deg C…
      #>  6 ALBERNI  1992-01-06     1   0   BC       station joined Homogenized d… Deg C…
      #>  7 ALBERNI  1992-01-07     1   0   BC       station joined Homogenized d… Deg C…
      #>  8 ALBERNI  1992-01-08     1   1.5 BC       station joined Homogenized d… Deg C…
      #>  9 ALBERNI  1992-01-09     1   4   BC       station joined Homogenized d… Deg C…
      #> 10 ALBERNI  1992-01-10     1   5.5 BC       station joined Homogenized d… Deg C…
      #> # … with 44,459 more rows, and 5 more variables: stn_last_updated <chr>,
      #> #   flag <chr>, stn_id <chr>, year <int>, measure <chr>
      
      toc()
      #> query already openend dataset: 0.356 sec elapsed
      

      The above reprex is self-contained, but will take a while to run, specifically the writing of the parquet dataset can take up to 30 min. It also downloads a 380MB zip file of csvs from my Dropbox.

      Attachments

        Activity

          People

            Unassigned Unassigned
            ateucher Andy Teucher
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated: