Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.0.0-M2
-
None
Description
A runtime exception is thrown when trying to import ProcessPoolExecutor in a Python custom processor. This affects other libraries such as llama-index when it tries to import ProcessPoolExecutor.
My system's full stack trace (see below for a simpler stack trace):
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last): File "/opt/nifi-2.0.0-SNAPSHOT/python/framework/py4j/java_gateway.py", line 2466, in _call_proxy return_value = getattr(self.pool[obj_id], method)(*params) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/nifi-2.0.0-SNAPSHOT/./python/framework/Controller.py", line 75, in createProcessor processorClass = self.extensionManager.getProcessorClass(processorType, version, work_dir) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/nifi-2.0.0-SNAPSHOT/python/framework/ExtensionManager.py", line 104, in getProcessorClass processor_class = self.__load_extension_module(module_file, details.local_dependencies) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/nifi-2.0.0-SNAPSHOT/python/framework/ExtensionManager.py", line 360, in __load_extension_module module_spec.loader.exec_module(module) File "<frozen importlib._bootstrap_external>", line 940, in exec_module File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed File "/Users/aethier/playground/the_source/datavolo/datavolo-resources/demo/advanced_rag_small_to_big/processors/RedisVectorStoreProcessor.py", line 4, in <module> from llama_index import GPTVectorStoreIndex, StorageContext, ServiceContext, Document File "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/__init__.py", line 24, in <module> from llama_index.indices import ( File "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/indices/__init__.py", line 4, in <module> from llama_index.indices.composability.graph import ComposableGraph File "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/indices/composability/__init__.py", line 4, in <module> from llama_index.indices.composability.graph import ComposableGraph File "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/indices/composability/graph.py", line 7, in <module> from llama_index.indices.base import BaseIndex File "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/indices/base.py", line 10, in <module> from llama_index.ingestion import run_transformations File "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/ingestion/__init__.py", line 2, in <module> from llama_index.ingestion.pipeline import ( File "/opt/nifi-2.0.0-SNAPSHOT/./work/python/extensions/RedisVectorStoreProcessor/2.0.0-M1/llama_index/ingestion/pipeline.py", line 5, in <module> from concurrent.futures import ProcessPoolExecutor File "<frozen importlib._bootstrap>", line 1229, in _handle_fromlist File "/opt/homebrew/Cellar/python@3.11/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/__init__.py", line 44, in __getattr__ from .process import ProcessPoolExecutor as pe File "/opt/homebrew/Cellar/python@3.11/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/process.py", line 106, in <module> threading._register_atexit(_python_exit) File "/opt/homebrew/Cellar/python@3.11/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/threading.py", line 1527, in _register_atexit raise RuntimeError("can't register atexit after shutdown") RuntimeError: can't register atexit after shutdown at py4j.Protocol.getReturnValue(Protocol.java:476) at org.apache.nifi.py4j.client.PythonProxyInvocationHandler.invoke(PythonProxyInvocationHandler.java:64) at jdk.proxy8/jdk.proxy8.$Proxy95.createProcessor(Unknown Source) at org.apache.nifi.py4j.StandardPythonBridge$1.createProcessor(StandardPythonBridge.java:116) at org.apache.nifi.py4j.StandardPythonProcessorBridge.initializePythonSide(StandardPythonProcessorBridge.java:106) at org.apache.nifi.py4j.StandardPythonProcessorBridge.lambda$initialize$0(StandardPythonProcessorBridge.java:67) at java.base/java.lang.VirtualThread.run(VirtualThread.java:309)
Note the problem exists for both python 3.9 and python 3.11 and on both NiFi 2.0.0 release and on the main branch.
The following is a stacktrace snippet:
Traceback (most recent call last): File "/configuration_resources/python_extensions/ImportTestProcessor.py", line 26, in transform from concurrent.futures import ProcessPoolExecutor File "<frozen importlib._bootstrap>", line 1055, in _handle_fromlist File "/usr/lib/python3.9/concurrent/futures/__init__.py", line 44, in __getattr__ from .process import ProcessPoolExecutor as pe File "/usr/lib/python3.9/concurrent/futures/process.py", line 101, in <module> threading._register_atexit(_python_exit) File "/usr/lib/python3.9/threading.py", line 1374, in _register_atexit raise RuntimeError("can't register atexit after shutdown") RuntimeError: can't register atexit after shutdown
When the import fails sometimes it can be hard to find the python stacktrace in the logs and sometimes the processor will not repeat the initialization (so the stacktrace is reported only once).
The following custom python processor can be used to generate the stacktrace snippet in an easily repeatable way:
from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult from nifiapi.properties import PropertyDescriptor, StandardValidators, PropertyDependency, ExpressionLanguageScope ### # Test python imports ## class ImportTestProcessor(FlowFileTransform): class Java: implements = ["org.apache.nifi.python.processor.FlowFileTransform"] class ProcessorDetails: version = "2.0.0-M1" description = """Test Imports""" tags = ["test"] def __init__(self, **kwargs): pass def transform(self, context, flowfile): import traceback stack_trace_str = "" try: from concurrent.futures import ProcessPoolExecutor except Exception as e: stack_trace_str = f"Exception:\n{traceback.format_exc()}" return FlowFileTransformResult( relationship="success", contents=stack_trace_str ) return FlowFileTransformResult( relationship="success" )
When running this processor, the flowfile output will show the stack trace in the flow file's content.
Attachments
Issue Links
- links to