Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-12739

Python custom processor cannot import ProcessPoolExecutor

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.0.0-M2
    • 2.0.0-M3
    • Extensions
    • None

    Description

      A runtime exception is thrown when trying to import ProcessPoolExecutor in a Python custom processor. This affects other libraries such as llama-index when it tries to import ProcessPoolExecutor.

      My system's full stack trace (see below for a simpler stack trace):

      py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
        File "/opt/nifi-2.0.0-SNAPSHOT/python/framework/py4j/java_gateway.py", line 2466, in _call_proxy
          return_value = getattr(self.pool[obj_id], method)(*params)
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        File "/opt/nifi-2.0.0-SNAPSHOT/./python/framework/Controller.py", line 75, in createProcessor
          processorClass = self.extensionManager.getProcessorClass(processorType, version, work_dir)
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        File "/opt/nifi-2.0.0-SNAPSHOT/python/framework/ExtensionManager.py", line 104, in getProcessorClass
          processor_class = self.__load_extension_module(module_file, details.local_dependencies)
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        File "/opt/nifi-2.0.0-SNAPSHOT/python/framework/ExtensionManager.py", line 360, in __load_extension_module
          module_spec.loader.exec_module(module)
        File "<frozen importlib._bootstrap_external>", line 940, in exec_module
        File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
        File "/Users/aethier/playground/the_source/datavolo/datavolo-resources/demo/advanced_rag_small_to_big/processors/RedisVectorStoreProcessor.py", line 4, in <module>
          from llama_index import GPTVectorStoreIndex, StorageContext, ServiceContext, Document
        File "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/__init__.py", line 24, in <module>
          from llama_index.indices import (
        File "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/indices/__init__.py", line 4, in <module>
          from llama_index.indices.composability.graph import ComposableGraph
        File "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/indices/composability/__init__.py", line 4, in <module>
          from llama_index.indices.composability.graph import ComposableGraph
        File "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/indices/composability/graph.py", line 7, in <module>
          from llama_index.indices.base import BaseIndex
        File "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/indices/base.py", line 10, in <module>
          from llama_index.ingestion import run_transformations
        File "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/ingestion/__init__.py", line 2, in <module>
          from llama_index.ingestion.pipeline import (
        File "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/ingestion/pipeline.py", line 5, in <module>
          from concurrent.futures import ProcessPoolExecutor
        File "<frozen importlib._bootstrap>", line 1229, in _handle_fromlist
        File "/opt/homebrew/Cellar/python@3.11/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/__init__.py", line 44, in __getattr__
          from .process import ProcessPoolExecutor as pe
        File "/opt/homebrew/Cellar/python@3.11/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/process.py", line 106, in <module>
          threading._register_atexit(_python_exit)
        File "/opt/homebrew/Cellar/python@3.11/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/threading.py", line 1527, in _register_atexit
          raise RuntimeError("can't register atexit after shutdown")
      RuntimeError: can't register atexit after shutdown
      
      	at py4j.Protocol.getReturnValue(Protocol.java:476)
      	at org.apache.nifi.py4j.client.PythonProxyInvocationHandler.invoke(PythonProxyInvocationHandler.java:64)
      	at jdk.proxy8/jdk.proxy8.$Proxy95.createProcessor(Unknown Source)
      	at org.apache.nifi.py4j.StandardPythonBridge$1.createProcessor(StandardPythonBridge.java:116)
      	at org.apache.nifi.py4j.StandardPythonProcessorBridge.initializePythonSide(StandardPythonProcessorBridge.java:106)
      	at org.apache.nifi.py4j.StandardPythonProcessorBridge.lambda$initialize$0(StandardPythonProcessorBridge.java:67)
      	at java.base/java.lang.VirtualThread.run(VirtualThread.java:309)

      Note the problem exists for both python 3.9 and python 3.11 and on both NiFi 2.0.0 release and on the main branch.

       

       

      The following is a stacktrace snippet:

       

      Traceback (most recent call last):
        File "/configuration_resources/python_extensions/ImportTestProcessor.py", line 26, in transform
          from concurrent.futures import ProcessPoolExecutor
        File "<frozen importlib._bootstrap>", line 1055, in _handle_fromlist
        File "/usr/lib/python3.9/concurrent/futures/__init__.py", line 44, in __getattr__
          from .process import ProcessPoolExecutor as pe
        File "/usr/lib/python3.9/concurrent/futures/process.py", line 101, in <module>
          threading._register_atexit(_python_exit)
        File "/usr/lib/python3.9/threading.py", line 1374, in _register_atexit
          raise RuntimeError("can't register atexit after shutdown")
      RuntimeError: can't register atexit after shutdown

      When the import fails sometimes it can be hard to find the python stacktrace in the logs and sometimes the processor will not repeat the initialization (so the stacktrace is reported only once).

       

      The following custom python processor can be used to generate the stacktrace snippet in an easily repeatable way:

      from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult
      from nifiapi.properties import PropertyDescriptor, StandardValidators, PropertyDependency, ExpressionLanguageScope
      
      
      ###
      # Test python imports
      ##
      class ImportTestProcessor(FlowFileTransform):
          class Java:
              implements = ["org.apache.nifi.python.processor.FlowFileTransform"]
      
      
          class ProcessorDetails:
              version = "2.0.0-M1"
              description = """Test Imports"""
              tags = ["test"]
      
      
          def __init__(self, **kwargs):
              pass
      
      
          def transform(self, context, flowfile):
      
      
              import traceback
              stack_trace_str = ""
              try:
                  from concurrent.futures import ProcessPoolExecutor
              except Exception as e:
                  stack_trace_str = f"Exception:\n{traceback.format_exc()}"
                  return FlowFileTransformResult(
                      relationship="success", contents=stack_trace_str
                  )
      
      
              return FlowFileTransformResult(
                  relationship="success"
              ) 

      When running this processor, the flowfile output will show the stack trace in the flow file's content.

       

      Attachments

        Issue Links

          Activity

            People

              aethier Alex Ethier
              aethier Alex Ethier
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m