Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-7338

User Defined aggregation with constants causes error under in lowerbound over window extraction

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Critical
    • Resolution: Fixed
    • 1.3.1
    • 1.4.0
    • Table SQL / API
    • None

    Description

      A user defined aggregation that passes a constant among the arguments causes a RuntimeException extracting the lower boundary over window.

      val sqlQuery = "SELECT a, " +
            "  myAgg(a, CAST('1' as BIGINT)) "+
            "   OVER (ORDER BY proctime RANGE BETWEEN INTERVAL '30' SECOND 
                     PRECEDING AND CURRENT ROW) " +
            "FROM MyTable"
      

      The error is in the org.apache.flink.table.plan.nodes.OverAggregate.scala

      we do : field count - lower bound index
      – which causes a -1 get, and subsequent RuntimeException.
      We should do: lower bound offset - field count to find the value in the constant array.

      The code below should fix the problem.

      private[flink] def getLowerBoundary(
          logicWindow: Window,
          overWindow: Group,
          input: RelNode): Long = {
      
          val ref: RexInputRef = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef]
          val lowerBoundIndex = ref.getIndex - input.getRowType.getFieldCount
          val lowerBound = logicWindow.constants.get(lowerBoundIndex).getValue2
          lowerBound match {
            case x: java.math.BigDecimal => x.asInstanceOf[java.math.BigDecimal].longValue()
            case _ => lowerBound.asInstanceOf[Long]
          }
        }
      

      Attachments

        Activity

          People

            fhueske Fabian Hueske
            stefano.bortoli Stefano Bortoli
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: