Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20440

`LAST_VALUE` aggregate function can not be used in hop window

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • 1.11.0
    • None
    • Table SQL / Planner
    • None

    Description

      Hi,  I run a sql job which use `last_value`  aggregate function in a hop window,  the sql as shown below

      create table test_in(
        id BIGINT,
        `name` VARCHAR,
        cost INT,
        proctime as PROCTIME()
      ) with (
        'connector.type' = 'kafka',
        'connector.version' = 'universal',
        'connector.topic' = 'test_in',
        'connector.startup-mode' = 'latest-offset',
        'connector.properties.bootstrap.servers' = '****',
        'connector.properties.group.id' = 'cdbddd',
        'connector.properties.zookeeper.connect' = '',
        'format.type' = 'csv'
      );
      create table test_mysql(
        id BIGINT,
        `name` VARCHAR,
        COST DOUBLE
      ) with (
        'connector.type' = 'jdbc',
      'connector.url' = '****',
        'connector.table' = 'abc',
        'connector.username' = 'abcdd',
        'connector.write.flush.interval' = '2s'
      );insert into
        `test_mysql`
      select
        a.id,
        last_value(a.`name`),
        last_value(a.cost)
      from
        test_in as a group by id, HOP(PROCTIME(), interval '10' second, interval '30' second);
      

      and when submit the job, the exception throws 

      org.apache.flink.table.api.ValidationException: Function class 'org.apache.flink.table.planner.functions.aggfunctions.LastValueAggFunction.StringLastValueAggFunction' does not implement at least one method named 'merge' which is public, not abstract and (in case of table functions) not static.
              at org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.checkAndExtractMethods(UserDefinedFunctionUtils.scala:442)
              at org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:318)
              at org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:273)
              at org.apache.flink.table.planner.codegen.agg.ImperativeAggCodeGen.checkNeededMethods(ImperativeAggCodeGen.scala:474)
              at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.$anonfun$checkNeededMethods$1(AggsHandlerCodeGenerator.scala:1116)
              at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.$anonfun$checkNeededMethods$1$adapted(AggsHandlerCodeGenerator.scala:1116)
              at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
              at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
              at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
              at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.checkNeededMethods(AggsHandlerCodeGenerator.scala:1116)
              at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genMerge(AggsHandlerCodeGenerator.scala:929)
              at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateNamespaceAggsHandler(AggsHandlerCodeGenerator.scala:578)
              at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.createAggsHandler(StreamExecGroupWindowAggregateBase.scala:248)
              at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:162)
              at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54)
              at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
              at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
              at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54)
              at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
              at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
              at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
              at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
              at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
              at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:165)
              at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:105)
              at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
              at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
              at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
              at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
              at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:67)
              at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
              at scala.collection.Iterator.foreach(Iterator.scala:941)
              at scala.collection.Iterator.foreach$(Iterator.scala:941)
              at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
              at scala.collection.IterableLike.foreach(IterableLike.scala:74)
              at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
              at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
              at scala.collection.TraversableLike.map(TraversableLike.scala:238)
              at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
              at scala.collection.AbstractTraversable.map(Traversable.scala:108)
              at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
              at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:104)
              at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:43)
              at org.apache.flink.table.api.internal.TableEnvironmentImpl.explainSql(TableEnvironmentImpl.java:644)
      
      

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              zouyunhe KevinyhZou
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: