Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-26920

Job executes failed with "The configured managed memory fraction for Python worker process must be within (0, 1], was: %s."

    XMLWordPrintableJSON

Details

    Description

      For the following code:

      import numpy as np
      from pyflink.common import Row
      from pyflink.common.typeinfo import Types
      from pyflink.datastream import StreamExecutionEnvironment
      from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
      from pyflink.table import StreamTableEnvironment
      from sklearn import svm, datasets
      
      env = StreamExecutionEnvironment.get_execution_environment()
      t_env = StreamTableEnvironment.create(stream_execution_environment=env)
      
      # Table Source
      t_env.execute_sql("""
          CREATE TABLE my_source (
              a FLOAT,
              key STRING
          ) WITH (
              'connector' = 'datagen',
              'rows-per-second' = '1',
              'fields.a.min' = '4.3',
              'fields.a.max' = '7.9',
              'fields.key.length' = '10'
          )
      """)
      
      
      def process_type():
          return Types.ROW_NAMED(
              ["a", "key"],
              [Types.FLOAT(), Types.STRING()]
          )
      
      
      # append only datastream
      ds = t_env.to_append_stream(
          t_env.from_path('my_source'),
          process_type())
      
      
      class MyKeyedProcessFunction(KeyedProcessFunction):
      
          def open(self, runtime_context: RuntimeContext):
              clf = svm.SVC()
              X, y= datasets.load_iris(return_X_y=True)
              clf.fit(X, y)
      
              self.model = clf
      
      
          def process_element(self, value: Row, ctx: 'KeyedProcessFunction.Context'):
      
              # 根据role_id + space去redis查询回合结算日志
      
              features = np.array([value['a'], 3.5, 1.4, 0.2]).reshape(1, -1)
              predict = int(self.model.predict(features)[0])
      
              yield Row(predict=predict, role_id=value['key'])
      
      
              
      ds = ds.key_by(lambda a: a['key'], key_type=Types.STRING()) \
          .process(
              MyKeyedProcessFunction(), 
              output_type=Types.ROW_NAMED(
                  ["hit", "role_id"],
                  [Types.INT(), Types.STRING()]
          ))
      
      
      # 采用table sink
      t_env.execute_sql("""
              CREATE TABLE my_sink (
                hit INT,
                role_id STRING
              ) WITH (
                'connector' = 'print'
              )
          """)
      
      t_env.create_temporary_view("predict", ds)
      t_env.execute_sql("""
          INSERT INTO my_sink
          SELECT * FROM predict
      """).wait()
      

      It reported the following exception:

      Caused by: java.lang.IllegalArgumentException: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s. It may be because the consumer type "Python" was missing or set to 0 for the config option "taskmanager.memory.managed.consumer-weights".0.0
      	at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
      	at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:233)
      	at org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:56)
      	at org.apache.flink.streaming.api.operators.python.AbstractOneInputPythonFunctionOperator.open(AbstractOneInputPythonFunctionOperator.java:116)
      	at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:121)
      	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:712)
      	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:688)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:655)
      	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
      	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
      	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
      	at java.lang.Thread.run(Thread.java:748)
      

      Attachments

        Issue Links

          Activity

            People

              dianfu Dian Fu
              dianfu Dian Fu
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: