Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
None
Description
As suggested by Jorge on https://github.com/apache/arrow/pull/8079
The high level idea is to allow ExecutionContext on multi-threaded environments such as Python.
The two use-cases:
1. when a project is planning a complex number of plans that depend on a common set of sources and UDFs, it would be nice to be able to multi-thread the planning. This is particularly important when planning requires reading remote metadata to formulate themselves (e.g. when the source is in s3 with many partitions). Metadata reading is often slow and network bounded, which makes threads suitable for these workloads. If multi-threading is not possible, either each plan needs to read the metadata independently (one context per plan) or planning must be sequential (with lots of network waiting).
2. when creating bindings to programming languages that support multi-threading, it would be nice for the ExecutionContext to be thread safe, so that we can more easily integrate with those languages.
The code might look like:
alamb@MacBook-Pro rust % git diff diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 5f8aa342e..7374b0a78 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -460,7 +460,7 @@ mod tests { use arrow::array::{ArrayRef, Int32Array}; use arrow::compute::add; use std::fs::File; - use std::io::prelude::*; + use std::{sync::Mutex, io::prelude::*}; use tempdir::TempDir; use test::*; @@ -928,6 +928,28 @@ mod tests { Ok(()) } + #[test] + fn send_context_to_threads() -> Result<()> { + // ensure that ExecutionContext's can be read by multiple threads concurrently + let tmp_dir = TempDir::new("send_context_to_threads")?; + let partition_count = 4; + let mut ctx = Arc::new(Mutex::new(create_ctx(&tmp_dir, partition_count)?)); + + let threads: Vec<JoinHandle<Result<_>>> = (0..2) + .map(|_| { ctx.clone() }) + .map(|ctx_clone| thread::spawn(move || { + let ctx = ctx_clone.lock().expect("Locked context"); + // Ensure we can create logical plan code on a separate thread. + ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3") + })) + .collect(); + + for thread in threads { + thread.join().expect("Failed to join thread")?; + } + Ok(()) + } + #[test] fn scalar_udf() -> Result<()> { let schema = Schema::new(vec![
At the moment, Rust refuses to compile this example (and also refuses to share ExecutionContexts between threads) due to the following (namely that there are several `dyn` objects that are also not marked as Send + Sync:
Compiling datafusion v2.0.0-SNAPSHOT (/Users/alamb/Software/arrow/rust/datafusion) error[E0277]: `(dyn execution::physical_plan::PhysicalPlanner + 'static)` cannot be sent between threads safely --> datafusion/src/execution/context.rs:940:30 | 940 | .map(|ctx_clone| thread::spawn(move || { | ^^^^^^^^^^^^^ `(dyn execution::physical_plan::PhysicalPlanner + 'static)` cannot be sent between threads safely | ::: /Users/alamb/.rustup/toolchains/nightly-2020-04-22-x86_64-apple-darwin/lib/rustlib/src/rust/src/libstd/thread/mod.rs:616:8 | 616 | F: Send + 'static, | ---- required by this bound in `std::thread::spawn` | = help: the trait `std::marker::Send` is not implemented for `(dyn execution::physical_plan::PhysicalPlanner + 'static)` = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<(dyn execution::physical_plan::PhysicalPlanner + 'static)>` = note: required because it appears within the type `std::option::Option<std::sync::Arc<(dyn execution::physical_plan::PhysicalPlanner + 'static)>>` = note: required because it appears within the type `execution::context::ExecutionConfig` = note: required because it appears within the type `execution::context::ExecutionContextState` = note: required because it appears within the type `execution::context::ExecutionContext` = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Mutex<execution::context::ExecutionContext>` = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>` = note: required because it appears within the type `[closure@datafusion/src/execution/context.rs:940:44: 944:14 ctx_clone:std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>]` error[E0277]: `(dyn execution::physical_plan::PhysicalPlanner + 'static)` cannot be shared between threads safely --> datafusion/src/execution/context.rs:940:30 | 940 | .map(|ctx_clone| thread::spawn(move || { | ^^^^^^^^^^^^^ `(dyn execution::physical_plan::PhysicalPlanner + 'static)` cannot be shared between threads safely | ::: /Users/alamb/.rustup/toolchains/nightly-2020-04-22-x86_64-apple-darwin/lib/rustlib/src/rust/src/libstd/thread/mod.rs:616:8 | 616 | F: Send + 'static, | ---- required by this bound in `std::thread::spawn` | = help: the trait `std::marker::Sync` is not implemented for `(dyn execution::physical_plan::PhysicalPlanner + 'static)` = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<(dyn execution::physical_plan::PhysicalPlanner + 'static)>` = note: required because it appears within the type `std::option::Option<std::sync::Arc<(dyn execution::physical_plan::PhysicalPlanner + 'static)>>` = note: required because it appears within the type `execution::context::ExecutionConfig` = note: required because it appears within the type `execution::context::ExecutionContextState` = note: required because it appears within the type `execution::context::ExecutionContext` = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Mutex<execution::context::ExecutionContext>` = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>` = note: required because it appears within the type `[closure@datafusion/src/execution/context.rs:940:44: 944:14 ctx_clone:std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>]` error[E0277]: `(dyn datasource::datasource::TableProvider + 'static)` cannot be sent between threads safely --> datafusion/src/execution/context.rs:940:30 | 940 | .map(|ctx_clone| thread::spawn(move || { | ^^^^^^^^^^^^^ `(dyn datasource::datasource::TableProvider + 'static)` cannot be sent between threads safely | ::: /Users/alamb/.rustup/toolchains/nightly-2020-04-22-x86_64-apple-darwin/lib/rustlib/src/rust/src/libstd/thread/mod.rs:616:8 | 616 | F: Send + 'static, | ---- required by this bound in `std::thread::spawn` | = help: the trait `std::marker::Send` is not implemented for `(dyn datasource::datasource::TableProvider + 'static)` = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<(dyn datasource::datasource::TableProvider + 'static)>` = note: required because it appears within the type `(std::string::String, std::sync::Arc<(dyn datasource::datasource::TableProvider + 'static)>)` = note: required because of the requirements on the impl of `std::marker::Send` for `hashbrown::raw::RawTable<(std::string::String, std::sync::Arc<(dyn datasource::datasource::TableProvider + 'static)>)>` = note: required because it appears within the type `hashbrown::map::HashMap<std::string::String, std::sync::Arc<(dyn datasource::datasource::TableProvider + 'static)>, std::collections::hash_map::RandomState>` = note: required because it appears within the type `std::collections::HashMap<std::string::String, std::sync::Arc<(dyn datasource::datasource::TableProvider + 'static)>>` = note: required because it appears within the type `execution::context::ExecutionContextState` = note: required because it appears within the type `execution::context::ExecutionContext` = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Mutex<execution::context::ExecutionContext>` = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>` = note: required because it appears within the type `[closure@datafusion/src/execution/context.rs:940:44: 944:14 ctx_clone:std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>]` error[E0277]: `(dyn datasource::datasource::TableProvider + 'static)` cannot be shared between threads safely --> datafusion/src/execution/context.rs:940:30 | 940 | .map(|ctx_clone| thread::spawn(move || { | ^^^^^^^^^^^^^ `(dyn datasource::datasource::TableProvider + 'static)` cannot be shared between threads safely | ::: /Users/alamb/.rustup/toolchains/nightly-2020-04-22-x86_64-apple-darwin/lib/rustlib/src/rust/src/libstd/thread/mod.rs:616:8 | 616 | F: Send + 'static, | ---- required by this bound in `std::thread::spawn` | = help: the trait `std::marker::Sync` is not implemented for `(dyn datasource::datasource::TableProvider + 'static)` = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<(dyn datasource::datasource::TableProvider + 'static)>` = note: required because it appears within the type `(std::string::String, std::sync::Arc<(dyn datasource::datasource::TableProvider + 'static)>)` = note: required because of the requirements on the impl of `std::marker::Send` for `hashbrown::raw::RawTable<(std::string::String, std::sync::Arc<(dyn datasource::datasource::TableProvider + 'static)>)>` = note: required because it appears within the type `hashbrown::map::HashMap<std::string::String, std::sync::Arc<(dyn datasource::datasource::TableProvider + 'static)>, std::collections::hash_map::RandomState>` = note: required because it appears within the type `std::collections::HashMap<std::string::String, std::sync::Arc<(dyn datasource::datasource::TableProvider + 'static)>>` = note: required because it appears within the type `execution::context::ExecutionContextState` = note: required because it appears within the type `execution::context::ExecutionContext` = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Mutex<execution::context::ExecutionContext>` = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>` = note: required because it appears within the type `[closure@datafusion/src/execution/context.rs:940:44: 944:14 ctx_clone:std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>]` Compiling arrow-benchmarks v2.0.0-SNAPSHOT (/Users/alamb/Software/arrow/rust/benchmarks) error: aborting due to 4 previous errors For more information about this error, try `rustc --explain E0277`. error: could not compile `datafusion`. To learn more, run the command again with --verbose. warning: build failed, waiting for other jobs to finish... error: build failed
Attachments
Issue Links
- relates to
-
ARROW-9425 [Rust][DataFusion] Make ExecutionContext sharable between threads
- Resolved
- links to