Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.13.1, 1.14.6, 1.15.3, 1.16.1
-
None
Description
Hi,
I'm trying to use PyFlink from Jupyter Notebook and I'm getting NPE in following scenario.
1. I'm creating datagen table.
from pyflink.table import EnvironmentSettings, TableEnvironment, StreamTableEnvironment, DataTypes from pyflink.table.udf import udf from pyflink.common import Configuration, Row from pyflink.datastream import StreamExecutionEnvironment from pyflink.java_gateway import get_gateway conf = Configuration() env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() table_env = StreamTableEnvironment.create(environment_settings=env_settings) table_env.get_config().get_configuration().set_integer("parallelism.default", 1) table_env.execute_sql("DROP TABLE IF EXISTS datagen") table_env.execute_sql(""" CREATE TABLE datagen ( id INT ) WITH ( 'connector' = 'datagen' ) """)
2. Then I'm running collect
try: result = table_env.sql_query("select * from datagen limit 1").execute() for r in result.collect(): print(r) except KeyboardInterrupt: result.get_job_client().cancel()
3. I'm using "interrupt the kernel" button. This is handled by above try/except and will cancel the query.
4. I'm running collect from point 2 one more time. Result:
--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) <ipython-input-5-98ef93c07bdb> in <module> 1 try: ----> 2 result = table_env.sql_query("select * from datagen limit 1").execute() 3 for r in result.collect(): 4 print(r) 5 except KeyboardInterrupt: /usr/local/lib/python3.8/dist-packages/pyflink/table/table.py in execute(self) 1070 """ 1071 self._t_env._before_execute() -> 1072 return TableResult(self._j_table.execute()) 1073 1074 def explain(self, *extra_details: ExplainDetail) -> str: /usr/local/lib/python3.8/dist-packages/py4j/java_gateway.py in __call__(self, *args) 1283 1284 answer = self.gateway_client.send_command(command) -> 1285 return_value = get_return_value( 1286 answer, self.gateway_client, self.target_id, self.name) 1287 /usr/local/lib/python3.8/dist-packages/pyflink/util/exceptions.py in deco(*a, **kw) 144 def deco(*a, **kw): 145 try: --> 146 return f(*a, **kw) 147 except Py4JJavaError as e: 148 from pyflink.java_gateway import get_gateway /usr/local/lib/python3.8/dist-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling o69.execute. : java.lang.NullPointerException at java.base/java.util.Objects.requireNonNull(Objects.java:221) at org.apache.calcite.rel.metadata.RelMetadataQuery.<init>(RelMetadataQuery.java:144) at org.apache.calcite.rel.metadata.RelMetadataQuery.<init>(RelMetadataQuery.java:108) at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.<init>(FlinkRelMetadataQuery.java:73) at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:54) at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39) at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38) at org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178) at org.apache.calcite.rel.metadata.RelMdUtil.clearCache(RelMdUtil.java:965) at org.apache.calcite.plan.hep.HepPlanner.clearCache(HepPlanner.java:879) at org.apache.calcite.plan.hep.HepPlanner.contractVertices(HepPlanner.java:858) at org.apache.calcite.plan.hep.HepPlanner.applyTransformationResults(HepPlanner.java:745) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:545) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:271) at org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:791) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1225) at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:577) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829)
PS. When I'm cancelling job from Web UI then I'm able to run collect twice. Problem exists only with cancelling the job from the code.
Attachments
Issue Links
- links to