Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.14.0
Description
For the following code:
import numpy as np from pyflink.common import Row from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext from pyflink.table import StreamTableEnvironment from sklearn import svm, datasets env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(stream_execution_environment=env) # Table Source t_env.execute_sql(""" CREATE TABLE my_source ( a FLOAT, key STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.a.min' = '4.3', 'fields.a.max' = '7.9', 'fields.key.length' = '10' ) """) def process_type(): return Types.ROW_NAMED( ["a", "key"], [Types.FLOAT(), Types.STRING()] ) # append only datastream ds = t_env.to_append_stream( t_env.from_path('my_source'), process_type()) class MyKeyedProcessFunction(KeyedProcessFunction): def open(self, runtime_context: RuntimeContext): clf = svm.SVC() X, y= datasets.load_iris(return_X_y=True) clf.fit(X, y) self.model = clf def process_element(self, value: Row, ctx: 'KeyedProcessFunction.Context'): # 根据role_id + space去redis查询回合结算日志 features = np.array([value['a'], 3.5, 1.4, 0.2]).reshape(1, -1) predict = int(self.model.predict(features)[0]) yield Row(predict=predict, role_id=value['key']) ds = ds.key_by(lambda a: a['key'], key_type=Types.STRING()) \ .process( MyKeyedProcessFunction(), output_type=Types.ROW_NAMED( ["hit", "role_id"], [Types.INT(), Types.STRING()] )) # 采用table sink t_env.execute_sql(""" CREATE TABLE my_sink ( hit INT, role_id STRING ) WITH ( 'connector' = 'print' ) """) t_env.create_temporary_view("predict", ds) t_env.execute_sql(""" INSERT INTO my_sink SELECT * FROM predict """).wait()
It reported the following exception:
Caused by: java.lang.IllegalArgumentException: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s. It may be because the consumer type "Python" was missing or set to 0 for the config option "taskmanager.memory.managed.consumer-weights".0.0 at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:233) at org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:56) at org.apache.flink.streaming.api.operators.python.AbstractOneInputPythonFunctionOperator.open(AbstractOneInputPythonFunctionOperator.java:116) at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:121) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:712) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:688) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:655) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:748)
Attachments
Issue Links
- links to