Uploaded image for project: 'Apache Arrow'
  1. Apache Arrow
  2. ARROW-12235

[Rust][DataFusion] LIMIT returns incorrect results when used with several small partitions

    XMLWordPrintableJSON

Details

    Description

      I noticed when I was running some queries locally that `LIMIT` was not behaving correctly. For my case, a query with `LIMIT 10` was always returning zero rows.

      I spent some time and I have found a self contained reproducer. If you put the following test in `rust/src/datafusion/execution/context.rs` it will fail.

      
      
          /// Return a RecordBatch with a single Int32 array with values (0..sz)
          fn make_partition(sz: i32) -> RecordBatch {
              let seq_start = 0;
              let seq_end =  sz;
              let values = (seq_start..seq_end).collect::<Vec<_>>();
              let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
              let arr = Arc::new(Int32Array::from(values));
              let arr = arr as ArrayRef;
      
              RecordBatch::try_new(schema.clone(),vec![arr]).unwrap()
          }
      
          #[tokio::test]
          async fn limit_multi_partitions() -> Result<()> {
              let tmp_dir = TempDir::new()?;
              let mut ctx = create_ctx(&tmp_dir, 1)?;
      
              let partitions = vec![
                  vec![make_partition(0)],
                  vec![make_partition(1)],
                  vec![make_partition(2)],
                  vec![make_partition(3)],
                  vec![make_partition(4)],
                  vec![make_partition(5)],
              ];
              let schema = partitions[0][0].schema();
              let provider = Arc::new(MemTable::try_new(schema, partitions).unwrap());
      
              ctx.register_table("t", provider)
                  .unwrap();
      
              // select all rows
              let results = plan_and_collect(&mut ctx, "SELECT i FROM t")
                  .await
                  .unwrap();
      
              let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
              assert_eq!(num_rows, 15);
      
              for limit in 1..10 {
                  let query = format!("SELECT i FROM t limit {}", limit);
                  let results = plan_and_collect(&mut ctx, &query)
                      .await
                      .unwrap();
      
                  let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
                  assert_eq!(num_rows, limit, "mismatch with query {}", query);
              }
      
              Ok(())
          }
      
      

      Attachments

        Issue Links

          Activity

            People

              alamb Andrew Lamb
              alamb Andrew Lamb
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 50m
                  50m