Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-16936

TablEnv creation and planner execution must be in the same thread

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Won't Fix
    • 1.10.0
    • None
    • Table SQL / API
    • None

    Description

      I hit this issue in zeppelin. Let me first describe the thread mode of zeppelin. In Zeppelin there're 3 threads. scalashell-thread is thread where tableenv created, python thread is the python process thread, python-javagateway-thread is the thread handling request from python thread(same as pyflink).

      Now if I use following table api, I will get the following exception. 

      st_env.from_path("cdn_access_log")\
         .select("uuid, "
                 "ip_to_province(client_ip) as province, " 
                 "response_size, request_time")\
         .group_by("province")\
         .select( 
                 "province, count(uuid) as access_count, " 
                 "sum(response_size) as total_download,  " 
                 "sum(response_size) * 1.0 / sum(request_time) as download_speed") \
         .insert_into("cdn_access_statistic") 

      Errors I get

      Py4JJavaError: An error occurred while calling o60.insertInto.
      : java.lang.RuntimeException: Error while applying rule FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL), args [rel#107:LogicalAggregate.NONE.any.None: 0.false.UNKNOWN(input=RelSubset#106,group={1},EXPR$0=COUNT($0),EXPR$1=SUM($2),EXPR$2=SUM($3))]
        at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
        at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631)
        at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
        at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
        at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
        at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
        at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
        at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
        at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334)
        at org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
      Caused by: java.lang.RuntimeException: Error occurred while applying rule FlinkLogicalAggregateStreamConverter(in:NONE,out:LOGICAL)
        at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:143)
        at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:236)
        at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:146)
        at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:208)
        ... 34 more
      Caused by: java.lang.NullPointerException
        at scala.Predef$.Double2double(Predef.scala:365)
        at org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate.computeSelfCost(FlinkLogicalAggregate.scala:78)
        at org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:174)
        at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown Source)
        at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown Source)
        at org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
        at org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:936)
        at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:347)
        at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:330)
        at org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1828)
        at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1764)
        at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
        at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
        at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1939)
        at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:129)
        ... 37 more
      
      (<class 'py4j.protocol.Py4JJavaError'>, Py4JJavaError('An error occurred while calling o60.insertInto.\n', JavaObject id=o61), <traceback object at 0x10fa9efc8>) 

      But it works for flink sql. After some investigation, I find the root cause is that in flink sql,
      this following code will be called in `SqlToRelConverter.java`

          RelMetadataQuery.THREAD_PROVIDERS.set(
              JaninoRelMetadataProvider.of(cluster.getMetadataProvider()));
      

      But in table api, no such code is called. In that case RelMetadataProvider wont' be set properly if tablenv creation and planner execution in different thread.
      It still works if tableenv creation and planner execution are in the same thread, because tableenv creation will set RelMetadataProvider properly in FlinkRelOptClusterFactory

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              zjffdu Jeff Zhang
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: