Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22982

java.lang.ClassCastException when using Python UDF

    XMLWordPrintableJSON

Details

    Description

      Hi,

      I'm trying to use Python UDF with logical condition as argument.

       

      log = logging.getLogger()
      
      @udf(result_type=DataTypes.BOOLEAN())
      def trace(message, condition):
          if condition:
              log.warn(message)
          return condition
      table_env.create_temporary_function('trace', trace)
      
      table_env.execute_sql("""
      CREATE TABLE datagen (
          n int
      ) WITH (
          'connector' = 'datagen',
          'number-of-rows' = '10'
      )
      """)
      
      result = table_env.sql_query("""
      SELECT * 
      FROM datagen
      WHERE trace(n, n < 0)
      """)
      for r in result.execute().collect():
          print(r)

       

      As a result I'm getting exception:

      Py4JJavaError: An error occurred while calling o135.execute.
      : java.lang.ClassCastException: class org.apache.calcite.rex.RexInputRef cannot be cast to class org.apache.calcite.rex.RexCall (org.apache.calcite.rex.RexInputRef and org.apache.calcite.rex.RexCall are in unnamed module of loader 'app')
      	at org.apache.flink.table.planner.plan.rules.logical.PythonMapMergeRule.matches(PythonMapMergeRule.java:70)
      	at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:538)
      	at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
      	at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
      	at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
      	at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
      	at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
      	at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
      	at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
      	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
      	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
      

       

      Attachments

        Issue Links

          Activity

            People

              hxbks2ks Huang Xingbo
              maver1ck Maciej Bryński
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: