Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.0.3, 3.1.2, 3.2.1, 3.3.0
-
None
Description
For example,
import time def func(batch_df, batch_id): time.sleep(10) print(batch_df.count()) q = spark.readStream.format("rate").load().writeStream.foreachBatch(func).start() time.sleep(5) q.stop()
works find with pinned thread mode is disabled. Whe pinned thread mode is enabled:
22/05/18 15:23:24 ERROR MicroBatchExecution: Query [id = 2538f8a2-c6e4-44c9-bf38-e6dab555267e, runId = 1d500478-1d77-46aa-b35a-585264a809b9] terminated with error py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last): File "/.../spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 617, in _call_proxy return_value = getattr(self.pool[obj_id], method)(*params) File "/.../spark/python/pyspark/sql/utils.py", line 272, in call raise e File "/.../spark/python/pyspark/sql/utils.py", line 269, in call self.func(DataFrame(jdf, self.session), batch_id) File "<stdin>", line 3, in func File "/.../spark/python/pyspark/sql/dataframe.py", line 804, in count return int(self._jdf.count()) File "/.../spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__ return_value = get_return_value( File "/.../spark/python/pyspark/sql/utils.py", line 190, in deco return f(*a, **kw) File "/.../spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value raise Py4JJavaError( py4j.protocol.Py4JJavaError: An error occurred while calling o44.count. : java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1302) at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:242) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:187) at org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:334) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:943) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2227)