Uploaded image for project: 'Apache Arrow'
  1. Apache Arrow
  2. ARROW-9937

[Rust] [DataFusion] Average is not correct

    XMLWordPrintableJSON

Details

    Description

      The current design of aggregates makes the calculation of the average incorrect.

      Namely, if there are multiple input partitions, the result is average of the averages. For example if the input it in two batches [1,2], and [3,4,5], datafusion will say "average=3.25" rather than "average=3".

      It also makes it impossible to compute the geometric mean, distinct sum, and other operations.

      The central issue is that Accumulator returns a `ScalarValue` during partial aggregations via get_value, but very often a `ScalarValue` is not sufficient information to perform the full aggregation.

      A simple example is the average of 5 numbers, x1, x2, x3, x4, x5, that are distributed in batches of 2,

      {[x1, x2], [x3, x4], [x5]}

      . Our current calculation performs partial means,

      {(x1+x2)/2, (x3+x4)/2, x5}

      , and then reduces them using another average, i.e.

      ((x1+x2)/2 + (x3+x4)/2 + x5)/3

      which is not equal to (x1 + x2 + x3 + x4 + x5)/5.

      I believe that our Accumulators need to pass more information from the partial aggregations to the final aggregation.

      We could consider taking an API equivalent to [spark](https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html), i.e. have an `update`, a `merge` and an `evaluate`.

      Code with a failing test (src/execution/context.rs)

          #[test]
          fn simple_avg() -> Result<()> {
              let schema = Schema::new(vec![
                  Field::new("a", DataType::Int32, false),
              ]);
      
              let batch1 = RecordBatch::try_new(
                  Arc::new(schema.clone()),
                  vec![
                      Arc::new(Int32Array::from(vec![1, 2, 3])),
                  ],
              )?;
              let batch2 = RecordBatch::try_new(
                  Arc::new(schema.clone()),
                  vec![
                      Arc::new(Int32Array::from(vec![4, 5])),
                  ],
              )?;
      
              let mut ctx = ExecutionContext::new();
      
              let provider = MemTable::new(Arc::new(schema), vec![vec![batch1], vec![batch2]])?;
              ctx.register_table("t", Box::new(provider));
      
              let result = collect(&mut ctx, "SELECT AVG(a) FROM t")?;
      
              let batch = &result[0];
              assert_eq!(1, batch.num_columns());
              assert_eq!(1, batch.num_rows());
      
              let values = batch
                  .column(0)
                  .as_any()
                  .downcast_ref::<Float64Array>()
                  .expect("failed to cast version");
              assert_eq!(values.len(), 1);
              // avg(1,2,3,4,5) = 3.0
              assert_eq!(values.value(0), 3.0_f64);
              Ok(())
          }
      

      Attachments

        Issue Links

          Activity

            People

              jorgecarleitao Jorge Leitão
              jorgecarleitao Jorge Leitão
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 8h 10m
                  8h 10m