Uploaded image for project: 'Apache Arrow'
  1. Apache Arrow
  2. ARROW-10844

[Rust] [DataFusion] join of two DataFrames is not possible

    XMLWordPrintableJSON

Details

    Description

      The complete failing test:

       

      use std::sync::Arc;
      
      use arrow::{array::{Int32Array, StringArray}, record_batch::RecordBatch};
      use arrow::datatypes::{DataType, Field, Schema};
      
      use datafusion::{datasource::MemTable, prelude::JoinType};
      use datafusion::error::Result;
      
      use datafusion::execution::context::ExecutionContext;
      
      #[tokio::test]
      async fn join() -> Result<()> {
          let schema1 = Arc::new(Schema::new(vec![
              Field::new("a", DataType::Utf8, false),
              Field::new("b", DataType::Int32, false),
          ]));
          let schema2 = Arc::new(Schema::new(vec![
              Field::new("a", DataType::Utf8, false),
              Field::new("c", DataType::Int32, false),
          ]));
      
          // define data.
          let batch1 = RecordBatch::try_new(
              schema1.clone(),
              vec![
                  Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
                  Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
              ],
          )?;
          // define data.
          let batch2 = RecordBatch::try_new(
              schema2.clone(),
              vec![
                  Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
                  Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
              ],
          )?;
      
          let mut ctx = ExecutionContext::new();
      
          let table1 = MemTable::new(schema1, vec![vec![batch1]])?;
          let table2 = MemTable::new(schema2, vec![vec![batch2]])?;
      
          ctx.register_table("aa", Box::new(table1));
      
          let df1 = ctx.table("aa")?;
      
          ctx.register_table("aaa", Box::new(table2));
      
          let df2 = ctx.table("aaa")?;
      
          let a = df1.join(df2, JoinType::Inner, &["a"], &["a"])?;
      
          let batches = a.collect().await?;
          assert_eq!(batches.len(), 1);
      
          Ok(())
      }
      

       

      When the create dataframes via `ctx.table`, they receive a clone of the ExecutionContextState If at a later stage the context receives a new table, that table will not be part of the state on the first DataFrame. On a Join op, the left DataFrame's state is passed to the newly created DataFrame, which is then used in collect(). Because the right side has a table not in the state of the left, the execution fails.

       

      We may need an Arc<Mutex<ExecutionContextState>> to share a common mutable state across multiple DataFrames. Alternatively, not require tables to be registered in the context to be used by DataFrames.

      Note that the current example in `DataFrame::join` docs works because the table is registered for both DataFrames.

      Attachments

        Issue Links

          Activity

            People

              jorgecarleitao Jorge Leitão
              jorgecarleitao Jorge Leitão
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 2h 10m
                  2h 10m