Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-23020

NullPointerException when running collect twice from Python API

    XMLWordPrintableJSON

Details

    Description

      Hi, 

      I'm trying to use PyFlink from Jupyter Notebook and I'm getting NPE in following scenario.

      1. I'm creating datagen table.

      from pyflink.table import EnvironmentSettings, TableEnvironment, StreamTableEnvironment, DataTypes
      from pyflink.table.udf import udf
      from pyflink.common import Configuration, Row
      from pyflink.datastream import StreamExecutionEnvironment
      from pyflink.java_gateway import get_gateway
      
      conf = Configuration()
      env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
      table_env = StreamTableEnvironment.create(environment_settings=env_settings)
      table_env.get_config().get_configuration().set_integer("parallelism.default", 1)
      
      table_env.execute_sql("DROP TABLE IF EXISTS datagen")
      table_env.execute_sql("""
      CREATE TABLE datagen (
          id INT
      ) WITH (
          'connector' = 'datagen'
      )
      """)
      

      2. Then I'm running collect

      try:
          result = table_env.sql_query("select * from datagen limit 1").execute()
          for r in result.collect():
              print(r)
      except KeyboardInterrupt:
          result.get_job_client().cancel()
      

      3. I'm using "interrupt the kernel" button. This is handled by above try/except and will cancel the query.

      4. I'm running collect from point 2 one more time. Result:

      ---------------------------------------------------------------------------
      Py4JJavaError                             Traceback (most recent call last)
      <ipython-input-5-98ef93c07bdb> in <module>
            1 try:
      ----> 2     result = table_env.sql_query("select * from datagen limit 1").execute()
            3     for r in result.collect():
            4         print(r)
            5 except KeyboardInterrupt:
      
      /usr/local/lib/python3.8/dist-packages/pyflink/table/table.py in execute(self)
         1070         """
         1071         self._t_env._before_execute()
      -> 1072         return TableResult(self._j_table.execute())
         1073 
         1074     def explain(self, *extra_details: ExplainDetail) -> str:
      
      /usr/local/lib/python3.8/dist-packages/py4j/java_gateway.py in __call__(self, *args)
         1283 
         1284         answer = self.gateway_client.send_command(command)
      -> 1285         return_value = get_return_value(
         1286             answer, self.gateway_client, self.target_id, self.name)
         1287 
      
      /usr/local/lib/python3.8/dist-packages/pyflink/util/exceptions.py in deco(*a, **kw)
          144     def deco(*a, **kw):
          145         try:
      --> 146             return f(*a, **kw)
          147         except Py4JJavaError as e:
          148             from pyflink.java_gateway import get_gateway
      
      /usr/local/lib/python3.8/dist-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
          324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
          325             if answer[1] == REFERENCE_TYPE:
      --> 326                 raise Py4JJavaError(
          327                     "An error occurred while calling {0}{1}{2}.\n".
          328                     format(target_id, ".", name), value)
      
      Py4JJavaError: An error occurred while calling o69.execute.
      : java.lang.NullPointerException
      	at java.base/java.util.Objects.requireNonNull(Objects.java:221)
      	at org.apache.calcite.rel.metadata.RelMetadataQuery.<init>(RelMetadataQuery.java:144)
      	at org.apache.calcite.rel.metadata.RelMetadataQuery.<init>(RelMetadataQuery.java:108)
      	at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.<init>(FlinkRelMetadataQuery.java:73)
      	at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:54)
      	at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
      	at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
      	at org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
      	at org.apache.calcite.rel.metadata.RelMdUtil.clearCache(RelMdUtil.java:965)
      	at org.apache.calcite.plan.hep.HepPlanner.clearCache(HepPlanner.java:879)
      	at org.apache.calcite.plan.hep.HepPlanner.contractVertices(HepPlanner.java:858)
      	at org.apache.calcite.plan.hep.HepPlanner.applyTransformationResults(HepPlanner.java:745)
      	at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:545)
      	at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
      	at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:271)
      	at org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
      	at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
      	at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
      	at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
      	at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
      	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
      	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
      	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
      	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
      	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
      	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
      	at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
      	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
      	at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
      	at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
      	at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
      	at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
      	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
      	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
      	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:791)
      	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1225)
      	at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:577)
      	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
      	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
      	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
      	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
      	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
      	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
      	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
      	at java.base/java.lang.Thread.run(Thread.java:829)
      

       

      PS. When I'm cancelling job from Web UI then I'm able to run collect twice. Problem exists only with cancelling the job from the code.

      Attachments

        Issue Links

          Activity

            People

              swtwsk Andrzej Swatowski
              maver1ck Maciej Bryński
              Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

                Created:
                Updated: