Uploaded image for project: 'Apache Arrow'
  1. Apache Arrow
  2. ARROW-14908

[R] join on dataset crashes on Windows

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 6.0.0
    • 7.0.2, 8.0.0
    • R
    • R version 4.0.4

    Description

      library(tidyverse)
      library(arrow)
      
      car_info <- rownames_to_column(mtcars, "car_info") 
      
      cars_arrow_table <- arrow_table(car_info)
      
      other_mtcars_data <- select(car_info, 1) %>% 
        mutate(main_color = sample( c("red", "blue", "white", "black"), size = n(), replace = TRUE)) %>% 
        arrow::arrow_table()
      
      temp <- tempdir()
      par_temp <- paste0(temp, "\\parquet")
      
      car_info %>% arrow::write_dataset(par_temp)
      cars_arrow <- arrow::open_dataset(par_temp) 
      
      # using arrow tables works ------------------------------------------------------
      cars_arrow_table %>% left_join(other_mtcars_data) %>% count(main_color) %>% collect()
      
      # using open dataset crashes R ------------------------------------------------------------------
      other_mtcars_data %>% 
        left_join(cars_arrow) %>% 
        count(main_color) %>% 
        collect()
      
      #other variation also crash
      cars_arrow %>% 
        left_join(other_mtcars_data) %>% 
        count(main_color) %>% 
        collect()
      
      cars_arrow %>% 
        left_join(other_mtcars_data) %>% 
        group_by(main_color) %>% 
        summarise(n = n()) %>% 
        collect()
      
      #compute also crashes
      cars_arrow %>% 
        left_join(other_mtcars_data) %>% 
        count(main_color) %>% 
        compute()
      
      # workaround with duckdb ------------------------------------------------------
      ##this works
      cars_duck <- to_duckdb(cars_arrow, auto_disconnect = TRUE)
      other_cars_duck <- to_duckdb(other_mtcars_data, auto_disconnect = TRUE)
          
      cars_duck %>% 
        left_join(other_cars_duck) %>%
        count(main_color) %>%
        collect()
      
      ##this doesn't (don't know if expected to work actually)
      cars_arrow %>% 
        left_join(other_mtcars_data) %>% 
        to_duckdb() 

      Attachments

        Issue Links

          Activity

            githubbot ASF GitHub Bot logged work - 04/Feb/22 16:22
            githubbot ASF GitHub Bot logged work - 04/Feb/22 17:01
            • Time Spent:
              10m
               
              wjones127 commented on pull request #12339:
              URL: https://github.com/apache/arrow/pull/12339#issuecomment-1030173474


                 My best guess right now is that some memory is getting clobbered by another thread. Is there a way to execute on a single thread somehow? Tried `set_cpu_count(1)` and `set_io_thread_count(1)` (and also `USE_THREADS=FALSE` is default on Windows), but same issue.
                 
                 @jonkeane Any thoughts on the threading settings?
                 @bkietz I know isn't a lot to go off of, but anything stand out to you in the above tracebacks?


              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 09/Feb/22 22:33
            • Time Spent:
              10m
               
              wjones127 commented on a change in pull request #12339:
              URL: https://github.com/apache/arrow/pull/12339#discussion_r803145529



              ##########
              File path: cpp/src/arrow/compute/exec/hash_join.cc
              ##########
              @@ -151,6 +151,7 @@ class HashJoinBasicImpl : public HashJoinImpl {
                 }
               
                 void InitLocalStateIfNeeded(size_t thread_index) {
              + DCHECK_LT(thread_index, local_states_.size());
                   ThreadLocalState& local_state = local_states_[thread_index];

              Review comment:
                     So far I've found that this DCHECK is not passing. So either we are missing a `ThreadLocalState` or the thread_index being passed is wrong.




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 09/Feb/22 23:25
            • Time Spent:
              10m
               
              wjones127 commented on a change in pull request #12339:
              URL: https://github.com/apache/arrow/pull/12339#discussion_r803174733



              ##########
              File path: cpp/src/arrow/compute/exec/hash_join.cc
              ##########
              @@ -151,6 +151,7 @@ class HashJoinBasicImpl : public HashJoinImpl {
                 }
               
                 void InitLocalStateIfNeeded(size_t thread_index) {
              + DCHECK_LT(thread_index, local_states_.size());
                   ThreadLocalState& local_state = local_states_[thread_index];

              Review comment:
                     It seems this fails whenever `options(arrow.use_threads = FALSE)` is set in the R session. I can trigger this on non-Windows platforms that way. Example CI failure: https://github.com/wjones127/arrow/runs/5133282492?check_suite_focus=true#step:7:10318




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 11/Feb/22 00:18
            • Time Spent:
              10m
               
              wjones127 commented on a change in pull request #12339:
              URL: https://github.com/apache/arrow/pull/12339#discussion_r804259267



              ##########
              File path: cpp/src/arrow/compute/exec/hash_join.cc
              ##########
              @@ -151,6 +151,7 @@ class HashJoinBasicImpl : public HashJoinImpl {
                 }
               
                 void InitLocalStateIfNeeded(size_t thread_index) {
              + DCHECK_LT(thread_index, local_states_.size());
                   ThreadLocalState& local_state = local_states_[thread_index];

              Review comment:
                     Even with `Sys.setenv(OMP_THREAD_LIMIT = "1")` this still occurs.
                 
                 I also tried writing a C++ unit test that did a join after a dataset scan, but I couldn't reproduce the problem. That leads me to think there may be some issue with how the R bindings are configuring things, but it could also be I just didn't reproduce it quite well enough.
                 
                 Despite `use_threads = FALSE`, it seems like there are quite a few threads spawned by the engine. While I'm learning, I'm just not familiar enough to know which parts seem weird.




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 11/Feb/22 02:59
            • Time Spent:
              10m
               
              westonpace commented on a change in pull request #12339:
              URL: https://github.com/apache/arrow/pull/12339#discussion_r804328939



              ##########
              File path: cpp/src/arrow/compute/exec/hash_join.cc
              ##########
              @@ -151,6 +151,7 @@ class HashJoinBasicImpl : public HashJoinImpl {
                 }
               
                 void InitLocalStateIfNeeded(size_t thread_index) {
              + DCHECK_LT(thread_index, local_states_.size());
                   ThreadLocalState& local_state = local_states_[thread_index];

              Review comment:
                     > Even with Sys.setenv(OMP_THREAD_LIMIT = "1") this still occurs.
                 
                 That isn't too surprising. `use_threads` triggers an entirely different path in some places. So it is not entirely equivalent to `OMP_THREAD_LIMIT = "1"`.
                 
                 > I also tried writing a C++ unit test that did a join after a dataset scan, but I couldn't reproduce the problem. That leads me to think there may be some issue with how the R bindings are configuring things, but it could also be I just didn't reproduce it quite well enough.
                 
                 How consistent is the R error?
                 
                 > Despite use_threads = FALSE, it seems like there are quite a few threads spawned by the engine. While I'm learning, I'm just not familiar enough to know which parts seem weird.
                 
                 `use_threads` generally does not control the I/O thread pool (which defaults to 8 threads and is not controlled by `OMP_THREAD_LIMIT`). If someone was really passionate about shoving everything onto the calling thread then there is a way to do this but it would be quite a bit of work.
                 
                 In addition, jemalloc (if compiled in), will spawn some background cleanup threads.




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 11/Feb/22 18:43
            • Time Spent:
              10m
               
              jonkeane commented on a change in pull request #12339:
              URL: https://github.com/apache/arrow/pull/12339#discussion_r804911698



              ##########
              File path: r/tests/testthat/test-dplyr-join.R
              ##########
              @@ -249,3 +249,24 @@ test_that("arrow dplyr query correctly filters then joins", {
                   )
                 )
               })
              +
              +
              +test_that("arrow dplyr query can join with tibble", {
              + # ARROW-14908
              + existing_use_threads <- getOption("arrow.use_threads")
              + options(arrow.use_threads = FALSE)
              + dir_out <- tempdir()
              +
              + # Note: Species is a DictionaryArray, but this still fails even if we convert to StringArray.
              + write_dataset(iris, file.path(dir_out, "iris"))
              + species_codes <- data.frame(Species = c("setosa", "versicolor", "virginica"),
              + code = c("SET", "VER", "VIR"))
              +
              + iris <- open_dataset(file.path(dir_out, "iris"))
              +
              + res <- left_join(iris, species_codes) %>% collect() # We should not segfault here.
              + expect_equal(nrow(res), 150)

              Review comment:
                     ```suggestion
                   expect_equal(nrow(res), 150)
                 ```

              ##########
              File path: r/tests/testthat/test-dplyr-join.R
              ##########
              @@ -249,3 +249,24 @@ test_that("arrow dplyr query correctly filters then joins", {
                   )
                 )
               })
              +
              +
              +test_that("arrow dplyr query can join with tibble", {
              + # ARROW-14908
              + existing_use_threads <- getOption("arrow.use_threads")
              + options(arrow.use_threads = FALSE)

              Review comment:
                     Nice test! This might be better to use `withr::with_options(list(arrow.use_threads = FALSE), { ... })` and then you don't need to worry about resetting later
                 
                 Similar to https://github.com/apache/arrow/blob/3b9462a4ffc9f1d20ffc4ba578adec0f0ed8ffbd/r/tests/testthat/test-parquet.R#L302-L313

              ##########
              File path: r/tests/testthat/test-dplyr-join.R
              ##########
              @@ -249,3 +249,24 @@ test_that("arrow dplyr query correctly filters then joins", {
                   )
                 )
               })
              +
              +
              +test_that("arrow dplyr query can join with tibble", {
              + # ARROW-14908
              + existing_use_threads <- getOption("arrow.use_threads")
              + options(arrow.use_threads = FALSE)
              + dir_out <- tempdir()
              +
              + # Note: Species is a DictionaryArray, but this still fails even if we convert to StringArray.
              + write_dataset(iris, file.path(dir_out, "iris"))
              + species_codes <- data.frame(Species = c("setosa", "versicolor", "virginica"),
              + code = c("SET", "VER", "VIR"))
              +
              + iris <- open_dataset(file.path(dir_out, "iris"))
              +
              + res <- left_join(iris, species_codes) %>% collect() # We should not segfault here.
              + expect_equal(nrow(res), 150)
              +
              + # Reset
              + options(arrow.use_threads = existing_use_threads)
              +})

              Review comment:
                     ```suggestion
                 })
                 
                 ```




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 14/Feb/22 21:07
            • Time Spent:
              10m
               
              westonpace commented on a change in pull request #12339:
              URL: https://github.com/apache/arrow/pull/12339#discussion_r806238789



              ##########
              File path: cpp/src/arrow/compute/exec/hash_join.cc
              ##########
              @@ -103,7 +103,7 @@ class HashJoinBasicImpl : public HashJoinImpl {
                   filter_ = std::move(filter);
                   output_batch_callback_ = std::move(output_batch_callback);
                   finished_callback_ = std::move(finished_callback);
              - local_states_.resize(num_threads);
              + local_states_.resize(num_threads + 1); // +1 for calling thread + worker thread

              Review comment:
                     ```suggestion
                     local_states_.resize(num_threads + 1); // +1 for calling thread + worker threads
                 ```

              ##########
              File path: cpp/src/arrow/compute/exec/hash_join_dict.cc
              ##########
              @@ -566,7 +566,7 @@ Status HashJoinDictBuildMulti::PostDecode(
               }
               
               void HashJoinDictProbeMulti::Init(size_t num_threads) {
              - local_states_.resize(num_threads);
              + local_states_.resize(num_threads + 1); // +1 for calling thread + worker thread

              Review comment:
                     ```suggestion
                   local_states_.resize(num_threads + 1); // +1 for calling thread + worker threads
                 ```




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 14/Feb/22 22:46
            githubbot ASF GitHub Bot logged work - 14/Feb/22 22:51
            githubbot ASF GitHub Bot logged work - 14/Feb/22 23:03
            githubbot ASF GitHub Bot logged work - 14/Feb/22 23:12
            githubbot ASF GitHub Bot logged work - 15/Feb/22 03:01
            githubbot ASF GitHub Bot logged work - 15/Feb/22 05:01
            githubbot ASF GitHub Bot logged work - 15/Feb/22 06:41
            githubbot ASF GitHub Bot logged work - 15/Feb/22 18:42
            githubbot ASF GitHub Bot logged work - 15/Feb/22 18:49
            githubbot ASF GitHub Bot logged work - 15/Feb/22 18:57
            githubbot ASF GitHub Bot logged work - 15/Feb/22 19:01
            • Time Spent:
              10m
               
              westonpace commented on a change in pull request #12339:
              URL: https://github.com/apache/arrow/pull/12339#discussion_r806238789



              ##########
              File path: cpp/src/arrow/compute/exec/hash_join.cc
              ##########
              @@ -103,7 +103,7 @@ class HashJoinBasicImpl : public HashJoinImpl {
                   filter_ = std::move(filter);
                   output_batch_callback_ = std::move(output_batch_callback);
                   finished_callback_ = std::move(finished_callback);
              - local_states_.resize(num_threads);
              + local_states_.resize(num_threads + 1); // +1 for calling thread + worker thread

              Review comment:
                     ```suggestion
                     local_states_.resize(num_threads + 1); // +1 for calling thread + worker threads
                 ```

              ##########
              File path: cpp/src/arrow/compute/exec/hash_join_dict.cc
              ##########
              @@ -566,7 +566,7 @@ Status HashJoinDictBuildMulti::PostDecode(
               }
               
               void HashJoinDictProbeMulti::Init(size_t num_threads) {
              - local_states_.resize(num_threads);
              + local_states_.resize(num_threads + 1); // +1 for calling thread + worker thread

              Review comment:
                     ```suggestion
                   local_states_.resize(num_threads + 1); // +1 for calling thread + worker threads
                 ```




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 15/Feb/22 20:53
            githubbot ASF GitHub Bot logged work - 15/Feb/22 21:44
            • Time Spent:
              10m
               
              wjones127 commented on pull request #12437:
              URL: https://github.com/apache/arrow/pull/12437#issuecomment-1040825099


                 When I run with a fix in place for hash join paths, I sometimes get `/Users/willjones/Documents/arrows/arrow/cpp/src/arrow/compute/exec/util.cc:329: Check failed: (thread_index) < (Capacity()) thread index 9 is out of range [0, 9)`, but most of the time it succeeds.


              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 15/Feb/22 22:06
            githubbot ASF GitHub Bot logged work - 15/Feb/22 22:09
            • Time Spent:
              10m
               
              wjones127 commented on a change in pull request #12437:
              URL: https://github.com/apache/arrow/pull/12437#discussion_r807358310



              ##########
              File path: cpp/src/arrow/compute/exec/hash_join.cc
              ##########
              @@ -896,6 +896,17 @@ class HashJoinBasicImpl : public HashJoinImpl {
                   std::vector<uint8_t> has_match;
                 };
                 std::vector<ThreadLocalState> local_states_;
              + ThreadLocalState& GetLocalState(size_t thread_index) {
              + if (ARROW_PREDICT_FALSE(thread_index >= local_states_.size())) {
              + size_t old_size = local_states_.size();
              + local_states_.resize(thread_index + 1);
              + for (size_t i = old_size; i < local_states_.size(); ++i) {
              + local_states_[i].is_initialized = false;
              + local_states_[i].is_has_match_initialized = false;
              + }
              + }
              + return local_states_[thread_index];
              + }

              Review comment:
                     So here's my suggestion for an alternative approach: Instead of trusting that the exact correct number of threads has been created (since that seems hard), gracefully resize the local state vectors as needed. I think there's a few more places I'd need to add this logic (we might even need this in the indexer; see my earlier comment about the occasional failure).
                 
                 What do you think @westonpace?




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 17/Feb/22 15:43
            • Time Spent:
              10m
               
              pitrou commented on a change in pull request #12437:
              URL: https://github.com/apache/arrow/pull/12437#discussion_r809190891



              ##########
              File path: cpp/src/arrow/compute/exec/hash_join.cc
              ##########
              @@ -896,6 +896,17 @@ class HashJoinBasicImpl : public HashJoinImpl {
                   std::vector<uint8_t> has_match;
                 };
                 std::vector<ThreadLocalState> local_states_;
              + ThreadLocalState& GetLocalState(size_t thread_index) {
              + if (ARROW_PREDICT_FALSE(thread_index >= local_states_.size())) {
              + size_t old_size = local_states_.size();
              + local_states_.resize(thread_index + 1);
              + for (size_t i = old_size; i < local_states_.size(); ++i) {
              + local_states_[i].is_initialized = false;
              + local_states_[i].is_has_match_initialized = false;
              + }
              + }
              + return local_states_[thread_index];
              + }

              Review comment:
                     Isn't this thread-unsafe? You're resizing a vector while it could be accessed by other threads concurrently?




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 17/Feb/22 15:48
            githubbot ASF GitHub Bot logged work - 17/Feb/22 16:06
            • Time Spent:
              10m
               
              wjones127 commented on a change in pull request #12437:
              URL: https://github.com/apache/arrow/pull/12437#discussion_r809216290



              ##########
              File path: cpp/src/arrow/compute/exec/hash_join.cc
              ##########
              @@ -896,6 +896,17 @@ class HashJoinBasicImpl : public HashJoinImpl {
                   std::vector<uint8_t> has_match;
                 };
                 std::vector<ThreadLocalState> local_states_;
              + ThreadLocalState& GetLocalState(size_t thread_index) {
              + if (ARROW_PREDICT_FALSE(thread_index >= local_states_.size())) {
              + size_t old_size = local_states_.size();
              + local_states_.resize(thread_index + 1);
              + for (size_t i = old_size; i < local_states_.size(); ++i) {
              + local_states_[i].is_initialized = false;
              + local_states_[i].is_has_match_initialized = false;
              + }
              + }
              + return local_states_[thread_index];
              + }

              Review comment:
                     Sure, that needs to be fixed.
                 




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 17/Feb/22 16:07
            githubbot ASF GitHub Bot logged work - 17/Feb/22 16:11
            • Time Spent:
              10m
               
              amol- commented on a change in pull request #12437:
              URL: https://github.com/apache/arrow/pull/12437#discussion_r809221088



              ##########
              File path: cpp/src/arrow/compute/exec/hash_join.cc
              ##########
              @@ -896,6 +896,17 @@ class HashJoinBasicImpl : public HashJoinImpl {
                   std::vector<uint8_t> has_match;
                 };
                 std::vector<ThreadLocalState> local_states_;
              + ThreadLocalState& GetLocalState(size_t thread_index) {
              + if (ARROW_PREDICT_FALSE(thread_index >= local_states_.size())) {
              + size_t old_size = local_states_.size();
              + local_states_.resize(thread_index + 1);
              + for (size_t i = old_size; i < local_states_.size(); ++i) {
              + local_states_[i].is_initialized = false;
              + local_states_[i].is_has_match_initialized = false;
              + }
              + }
              + return local_states_[thread_index];
              + }

              Review comment:
                     I'm curios, given that during the `Init` the vector is set to the size equal to the number of threads, when does it happen that `GetLocalState` is invoked with a thread index outside of the already allocated ones? I thought we were using threadpools and thus the amount of threads was stable. Are we recycling them or something like that?




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 17/Feb/22 16:18
            • Time Spent:
              10m
               
              lidavidm commented on a change in pull request #12437:
              URL: https://github.com/apache/arrow/pull/12437#discussion_r809227919



              ##########
              File path: cpp/src/arrow/compute/exec/hash_join.cc
              ##########
              @@ -896,6 +896,17 @@ class HashJoinBasicImpl : public HashJoinImpl {
                   std::vector<uint8_t> has_match;
                 };
                 std::vector<ThreadLocalState> local_states_;
              + ThreadLocalState& GetLocalState(size_t thread_index) {
              + if (ARROW_PREDICT_FALSE(thread_index >= local_states_.size())) {
              + size_t old_size = local_states_.size();
              + local_states_.resize(thread_index + 1);
              + for (size_t i = old_size; i < local_states_.size(); ++i) {
              + local_states_[i].is_initialized = false;
              + local_states_[i].is_has_match_initialized = false;
              + }
              + }
              + return local_states_[thread_index];
              + }

              Review comment:
                     The thread pools don't include the main thread, for instance. Also it's sized to the CPU thread pool, but something might 'leak' from the IO thread pool.




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 17/Feb/22 16:19
            • Time Spent:
              10m
               
              wjones127 commented on a change in pull request #12437:
              URL: https://github.com/apache/arrow/pull/12437#discussion_r809229038



              ##########
              File path: cpp/src/arrow/compute/exec/hash_join.cc
              ##########
              @@ -896,6 +896,17 @@ class HashJoinBasicImpl : public HashJoinImpl {
                   std::vector<uint8_t> has_match;
                 };
                 std::vector<ThreadLocalState> local_states_;
              + ThreadLocalState& GetLocalState(size_t thread_index) {
              + if (ARROW_PREDICT_FALSE(thread_index >= local_states_.size())) {
              + size_t old_size = local_states_.size();
              + local_states_.resize(thread_index + 1);
              + for (size_t i = old_size; i < local_states_.size(); ++i) {
              + local_states_[i].is_initialized = false;
              + local_states_[i].is_has_match_initialized = false;
              + }
              + }
              + return local_states_[thread_index];
              + }

              Review comment:
                     The IO threads and CPU threads are separate pools, and if we don't pass an executor the source node runs the downstream nodes on the IO thread: https://github.com/apache/arrow/blob/d94365f745ac51937f010fa32efbb2ce13f90116/cpp/src/arrow/compute/exec/source_node.cc#L128




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 17/Feb/22 16:23
            • Time Spent:
              10m
               
              pitrou commented on a change in pull request #12437:
              URL: https://github.com/apache/arrow/pull/12437#discussion_r809233782



              ##########
              File path: cpp/src/arrow/compute/exec/hash_join.cc
              ##########
              @@ -896,6 +896,17 @@ class HashJoinBasicImpl : public HashJoinImpl {
                   std::vector<uint8_t> has_match;
                 };
                 std::vector<ThreadLocalState> local_states_;
              + ThreadLocalState& GetLocalState(size_t thread_index) {
              + if (ARROW_PREDICT_FALSE(thread_index >= local_states_.size())) {
              + size_t old_size = local_states_.size();
              + local_states_.resize(thread_index + 1);
              + for (size_t i = old_size; i < local_states_.size(); ++i) {
              + local_states_[i].is_initialized = false;
              + local_states_[i].is_has_match_initialized = false;
              + }
              + }
              + return local_states_[thread_index];
              + }

              Review comment:
                     And by the way, the user is allowed to change thread pool capacity at runtime, so static sizing will never be correct even in the simple case of a single thread pool.




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 17/Feb/22 19:07
            • Time Spent:
              10m
               
              westonpace commented on a change in pull request #12437:
              URL: https://github.com/apache/arrow/pull/12437#discussion_r809377866



              ##########
              File path: cpp/src/arrow/compute/exec/hash_join.cc
              ##########
              @@ -896,6 +896,17 @@ class HashJoinBasicImpl : public HashJoinImpl {
                   std::vector<uint8_t> has_match;
                 };
                 std::vector<ThreadLocalState> local_states_;
              + ThreadLocalState& GetLocalState(size_t thread_index) {
              + if (ARROW_PREDICT_FALSE(thread_index >= local_states_.size())) {
              + size_t old_size = local_states_.size();
              + local_states_.resize(thread_index + 1);
              + for (size_t i = old_size; i < local_states_.size(); ++i) {
              + local_states_[i].is_initialized = false;
              + local_states_[i].is_has_match_initialized = false;
              + }
              + }
              + return local_states_[thread_index];
              + }

              Review comment:
                     The sizing is recomputed for each new exec plan so the failure would only occur on plans that were running when the thread pool was resized. I am planning on taking a look at a better implementation for use_threads=FALSE on Friday using a serial executor which will ensure that an exec plan always has an executor and exec plan steps are always run on a thread belonging to that executor. This will solve all but the issue Antoine mentioned.
                 
                 That being said, I think your solution is reasonable. I'll have to ping @bkietz and @michalursa as they were the original proponents of the statically sized thread states. I don't know if that was based on speculation, existing literature, or actual benchmark measurements however.




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 28/Mar/22 17:09
            githubbot ASF GitHub Bot logged work - 28/Mar/22 17:09

            People

              wjones127 Will Jones
              wjones127 Will Jones
              Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 5h 40m
                  5h 40m