Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-17923

It will throw MemoryAllocationException if rocksdb statebackend and Python UDF are used in the same slot

    XMLWordPrintableJSON

Details

    Description

      For the following job:

      import logging
      import os
      import shutil
      import sys
      import tempfile
      
      from pyflink.datastream import StreamExecutionEnvironment
      from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes
      from pyflink.table.udf import udf
      
      
      def word_count():
          content = "line Licensed to the Apache Software Foundation ASF under one " \
                    "line or more contributor license agreements See the NOTICE file " \
                    "line distributed with this work for additional information " \
                    "line regarding copyright ownership The ASF licenses this file " \
                    "to you under the Apache License Version the " \
                    "License you may not use this file except in compliance " \
                    "with the License"
      
          t_config = TableConfig()
          env = StreamExecutionEnvironment.get_execution_environment()
          t_env = StreamTableEnvironment.create(env, t_config)
      
          # register Results table in table environment
          tmp_dir = tempfile.gettempdir()
          result_path = tmp_dir + '/result'
          if os.path.exists(result_path):
              try:
                  if os.path.isfile(result_path):
                      os.remove(result_path)
                  else:
                      shutil.rmtree(result_path)
              except OSError as e:
                  logging.error("Error removing directory: %s - %s.", e.filename, e.strerror)
      
          logging.info("Results directory: %s", result_path)
      
          sink_ddl = """
              create table Results(
                  word VARCHAR,
                  `count` BIGINT
              ) with (
                  'connector' = 'blackhole'
              )
              """
          t_env.sql_update(sink_ddl)
      
          @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
          def inc(count):
              return count + 1
          t_env.register_function("inc", inc)
      
          elements = [(word, 1) for word in content.split(" ")]
          t_env.from_elements(elements, ["word", "count"]) \
               .group_by("word") \
               .select("word, count(1) as count") \
               .select("word, inc(count) as count") \
               .insert_into("Results")
      
          t_env.execute("word_count")
      
      
      if __name__ == '__main__':
          logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
      
          word_count()
      

      It will throw the following exception if rocksdb state backend is used:

      Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) from any of the 1 provided restore options.
      	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
      	... 9 more
      Caused by: java.io.IOException: Failed to acquire shared cache resource for RocksDB
      	at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:212)
      	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:516)
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
      	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
      	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
      	... 11 more
      Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could not created the shared memory resource of size 536870920. Not enough memory left to reserve from the slot's managed memory.
      	at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:603)
      	at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
      	at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
      	at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:617)
      	at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:566)
      	at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:208)
      	... 15 more
      Caused by: org.apache.flink.runtime.memory.MemoryReservationException: Could not allocate 536870920 bytes. Only 454033416 bytes are remaining.
      	at org.apache.flink.runtime.memory.MemoryManager.reserveMemory(MemoryManager.java:461)
      	at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:601)
      	... 20 more
      

      Attachments

        Issue Links

          Activity

            People

              dian.fu Dian Fu
              dian.fu Dian Fu
              Votes:
              0 Vote for this issue
              Watchers:
              12 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: