Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
1.13.0
Description
Hi,
I'm trying to use Python UDF with logical condition as argument.
log = logging.getLogger() @udf(result_type=DataTypes.BOOLEAN()) def trace(message, condition): if condition: log.warn(message) return condition table_env.create_temporary_function('trace', trace) table_env.execute_sql(""" CREATE TABLE datagen ( n int ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '10' ) """) result = table_env.sql_query(""" SELECT * FROM datagen WHERE trace(n, n < 0) """) for r in result.execute().collect(): print(r)
As a result I'm getting exception:
Py4JJavaError: An error occurred while calling o135.execute. : java.lang.ClassCastException: class org.apache.calcite.rex.RexInputRef cannot be cast to class org.apache.calcite.rex.RexCall (org.apache.calcite.rex.RexInputRef and org.apache.calcite.rex.RexCall are in unnamed module of loader 'app') at org.apache.flink.table.planner.plan.rules.logical.PythonMapMergeRule.matches(PythonMapMergeRule.java:70) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:538) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
Attachments
Issue Links
- links to