Uploaded image for project: 'Apache Arrow'
  1. Apache Arrow
  2. ARROW-9888

[Rust] [DataFusion] ExecutionContext can not be shared between threads

    XMLWordPrintableJSON

Details

    Description

      As suggested by Jorge on https://github.com/apache/arrow/pull/8079

      The high level idea is to allow ExecutionContext on multi-threaded environments such as Python.

      The two use-cases:

      1. when a project is planning a complex number of plans that depend on a common set of sources and UDFs, it would be nice to be able to multi-thread the planning. This is particularly important when planning requires reading remote metadata to formulate themselves (e.g. when the source is in s3 with many partitions). Metadata reading is often slow and network bounded, which makes threads suitable for these workloads. If multi-threading is not possible, either each plan needs to read the metadata independently (one context per plan) or planning must be sequential (with lots of network waiting).

      2. when creating bindings to programming languages that support multi-threading, it would be nice for the ExecutionContext to be thread safe, so that we can more easily integrate with those languages.

      The code might look like:

      alamb@MacBook-Pro rust % git diff
      diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs
      index 5f8aa342e..7374b0a78 100644
      --- a/rust/datafusion/src/execution/context.rs
      +++ b/rust/datafusion/src/execution/context.rs
      @@ -460,7 +460,7 @@ mod tests {
           use arrow::array::{ArrayRef, Int32Array};
           use arrow::compute::add;
           use std::fs::File;
      -    use std::io::prelude::*;
      +    use std::{sync::Mutex, io::prelude::*};
           use tempdir::TempDir;
           use test::*;
       
      @@ -928,6 +928,28 @@ mod tests {
               Ok(())
           }
       
      +    #[test]
      +    fn send_context_to_threads() -> Result<()> {
      +        // ensure that ExecutionContext's can be read by multiple threads concurrently
      +        let tmp_dir = TempDir::new("send_context_to_threads")?;
      +        let partition_count = 4;
      +        let mut ctx = Arc::new(Mutex::new(create_ctx(&tmp_dir, partition_count)?));
      +
      +        let threads: Vec<JoinHandle<Result<_>>> = (0..2)
      +            .map(|_| { ctx.clone() })
      +            .map(|ctx_clone| thread::spawn(move || {
      +                let ctx = ctx_clone.lock().expect("Locked context");
      +                // Ensure we can create logical plan code on a separate thread.
      +                ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")
      +            }))
      +            .collect();
      +
      +        for thread in threads {
      +            thread.join().expect("Failed to join thread")?;
      +        }
      +        Ok(())
      +    }
      +
           #[test]
           fn scalar_udf() -> Result<()> {
               let schema = Schema::new(vec![
      

      At the moment, Rust refuses to compile this example (and also refuses to share ExecutionContexts between threads) due to the following (namely that there are several `dyn` objects that are also not marked as Send + Sync:

         Compiling datafusion v2.0.0-SNAPSHOT (/Users/alamb/Software/arrow/rust/datafusion)
      error[E0277]: `(dyn execution::physical_plan::PhysicalPlanner + 'static)` cannot be sent between threads safely
         --> datafusion/src/execution/context.rs:940:30
          |
      940 |             .map(|ctx_clone| thread::spawn(move || {
          |                              ^^^^^^^^^^^^^ `(dyn execution::physical_plan::PhysicalPlanner + 'static)` cannot be sent between threads safely
          | 
         ::: /Users/alamb/.rustup/toolchains/nightly-2020-04-22-x86_64-apple-darwin/lib/rustlib/src/rust/src/libstd/thread/mod.rs:616:8
          |
      616 |     F: Send + 'static,
          |        ---- required by this bound in `std::thread::spawn`
          |
          = help: the trait `std::marker::Send` is not implemented for `(dyn execution::physical_plan::PhysicalPlanner + 'static)`
          = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<(dyn execution::physical_plan::PhysicalPlanner + 'static)>`
          = note: required because it appears within the type `std::option::Option<std::sync::Arc<(dyn execution::physical_plan::PhysicalPlanner + 'static)>>`
          = note: required because it appears within the type `execution::context::ExecutionConfig`
          = note: required because it appears within the type `execution::context::ExecutionContextState`
          = note: required because it appears within the type `execution::context::ExecutionContext`
          = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Mutex<execution::context::ExecutionContext>`
          = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>`
          = note: required because it appears within the type `[closure@datafusion/src/execution/context.rs:940:44: 944:14 ctx_clone:std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>]`
      
      error[E0277]: `(dyn execution::physical_plan::PhysicalPlanner + 'static)` cannot be shared between threads safely
         --> datafusion/src/execution/context.rs:940:30
          |
      940 |             .map(|ctx_clone| thread::spawn(move || {
          |                              ^^^^^^^^^^^^^ `(dyn execution::physical_plan::PhysicalPlanner + 'static)` cannot be shared between threads safely
          | 
         ::: /Users/alamb/.rustup/toolchains/nightly-2020-04-22-x86_64-apple-darwin/lib/rustlib/src/rust/src/libstd/thread/mod.rs:616:8
          |
      616 |     F: Send + 'static,
          |        ---- required by this bound in `std::thread::spawn`
          |
          = help: the trait `std::marker::Sync` is not implemented for `(dyn execution::physical_plan::PhysicalPlanner + 'static)`
          = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<(dyn execution::physical_plan::PhysicalPlanner + 'static)>`
          = note: required because it appears within the type `std::option::Option<std::sync::Arc<(dyn execution::physical_plan::PhysicalPlanner + 'static)>>`
          = note: required because it appears within the type `execution::context::ExecutionConfig`
          = note: required because it appears within the type `execution::context::ExecutionContextState`
          = note: required because it appears within the type `execution::context::ExecutionContext`
          = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Mutex<execution::context::ExecutionContext>`
          = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>`
          = note: required because it appears within the type `[closure@datafusion/src/execution/context.rs:940:44: 944:14 ctx_clone:std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>]`
      
      error[E0277]: `(dyn datasource::datasource::TableProvider + 'static)` cannot be sent between threads safely
         --> datafusion/src/execution/context.rs:940:30
          |
      940 |             .map(|ctx_clone| thread::spawn(move || {
          |                              ^^^^^^^^^^^^^ `(dyn datasource::datasource::TableProvider + 'static)` cannot be sent between threads safely
          | 
         ::: /Users/alamb/.rustup/toolchains/nightly-2020-04-22-x86_64-apple-darwin/lib/rustlib/src/rust/src/libstd/thread/mod.rs:616:8
          |
      616 |     F: Send + 'static,
          |        ---- required by this bound in `std::thread::spawn`
          |
          = help: the trait `std::marker::Send` is not implemented for `(dyn datasource::datasource::TableProvider + 'static)`
          = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<(dyn datasource::datasource::TableProvider + 'static)>`
          = note: required because it appears within the type `(std::string::String, std::sync::Arc<(dyn datasource::datasource::TableProvider + 'static)>)`
          = note: required because of the requirements on the impl of `std::marker::Send` for `hashbrown::raw::RawTable<(std::string::String, std::sync::Arc<(dyn datasource::datasource::TableProvider + 'static)>)>`
          = note: required because it appears within the type `hashbrown::map::HashMap<std::string::String, std::sync::Arc<(dyn datasource::datasource::TableProvider + 'static)>, std::collections::hash_map::RandomState>`
          = note: required because it appears within the type `std::collections::HashMap<std::string::String, std::sync::Arc<(dyn datasource::datasource::TableProvider + 'static)>>`
          = note: required because it appears within the type `execution::context::ExecutionContextState`
          = note: required because it appears within the type `execution::context::ExecutionContext`
          = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Mutex<execution::context::ExecutionContext>`
          = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>`
          = note: required because it appears within the type `[closure@datafusion/src/execution/context.rs:940:44: 944:14 ctx_clone:std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>]`
      
      error[E0277]: `(dyn datasource::datasource::TableProvider + 'static)` cannot be shared between threads safely
         --> datafusion/src/execution/context.rs:940:30
          |
      940 |             .map(|ctx_clone| thread::spawn(move || {
          |                              ^^^^^^^^^^^^^ `(dyn datasource::datasource::TableProvider + 'static)` cannot be shared between threads safely
          | 
         ::: /Users/alamb/.rustup/toolchains/nightly-2020-04-22-x86_64-apple-darwin/lib/rustlib/src/rust/src/libstd/thread/mod.rs:616:8
          |
      616 |     F: Send + 'static,
          |        ---- required by this bound in `std::thread::spawn`
          |
          = help: the trait `std::marker::Sync` is not implemented for `(dyn datasource::datasource::TableProvider + 'static)`
          = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<(dyn datasource::datasource::TableProvider + 'static)>`
          = note: required because it appears within the type `(std::string::String, std::sync::Arc<(dyn datasource::datasource::TableProvider + 'static)>)`
          = note: required because of the requirements on the impl of `std::marker::Send` for `hashbrown::raw::RawTable<(std::string::String, std::sync::Arc<(dyn datasource::datasource::TableProvider + 'static)>)>`
          = note: required because it appears within the type `hashbrown::map::HashMap<std::string::String, std::sync::Arc<(dyn datasource::datasource::TableProvider + 'static)>, std::collections::hash_map::RandomState>`
          = note: required because it appears within the type `std::collections::HashMap<std::string::String, std::sync::Arc<(dyn datasource::datasource::TableProvider + 'static)>>`
          = note: required because it appears within the type `execution::context::ExecutionContextState`
          = note: required because it appears within the type `execution::context::ExecutionContext`
          = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Mutex<execution::context::ExecutionContext>`
          = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>`
          = note: required because it appears within the type `[closure@datafusion/src/execution/context.rs:940:44: 944:14 ctx_clone:std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>]`
      
         Compiling arrow-benchmarks v2.0.0-SNAPSHOT (/Users/alamb/Software/arrow/rust/benchmarks)
      error: aborting due to 4 previous errors
      
      For more information about this error, try `rustc --explain E0277`.
      error: could not compile `datafusion`.
      
      To learn more, run the command again with --verbose.
      warning: build failed, waiting for other jobs to finish...
      error: build failed
      

      Attachments

        Issue Links

          Activity

            People

              alamb Andrew Lamb
              alamb Andrew Lamb
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 50m
                  50m