Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
3.0.0
Description
The complete failing test:
use std::sync::Arc; use arrow::{array::{Int32Array, StringArray}, record_batch::RecordBatch}; use arrow::datatypes::{DataType, Field, Schema}; use datafusion::{datasource::MemTable, prelude::JoinType}; use datafusion::error::Result; use datafusion::execution::context::ExecutionContext; #[tokio::test] async fn join() -> Result<()> { let schema1 = Arc::new(Schema::new(vec![ Field::new("a", DataType::Utf8, false), Field::new("b", DataType::Int32, false), ])); let schema2 = Arc::new(Schema::new(vec![ Field::new("a", DataType::Utf8, false), Field::new("c", DataType::Int32, false), ])); // define data. let batch1 = RecordBatch::try_new( schema1.clone(), vec![ Arc::new(StringArray::from(vec!["a", "b", "c", "d"])), Arc::new(Int32Array::from(vec![1, 10, 10, 100])), ], )?; // define data. let batch2 = RecordBatch::try_new( schema2.clone(), vec![ Arc::new(StringArray::from(vec!["a", "b", "c", "d"])), Arc::new(Int32Array::from(vec![1, 10, 10, 100])), ], )?; let mut ctx = ExecutionContext::new(); let table1 = MemTable::new(schema1, vec![vec![batch1]])?; let table2 = MemTable::new(schema2, vec![vec![batch2]])?; ctx.register_table("aa", Box::new(table1)); let df1 = ctx.table("aa")?; ctx.register_table("aaa", Box::new(table2)); let df2 = ctx.table("aaa")?; let a = df1.join(df2, JoinType::Inner, &["a"], &["a"])?; let batches = a.collect().await?; assert_eq!(batches.len(), 1); Ok(()) }
When the create dataframes via `ctx.table`, they receive a clone of the ExecutionContextState If at a later stage the context receives a new table, that table will not be part of the state on the first DataFrame. On a Join op, the left DataFrame's state is passed to the newly created DataFrame, which is then used in collect(). Because the right side has a table not in the state of the left, the execution fails.
We may need an Arc<Mutex<ExecutionContextState>> to share a common mutable state across multiple DataFrames. Alternatively, not require tables to be registered in the context to be used by DataFrames.
Note that the current example in `DataFrame::join` docs works because the table is registered for both DataFrames.
Attachments
Issue Links
- links to