Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.5.2
Description
Using PySpark Connect to run `writeTo(...).overwritePartitions(...)` commands, I frequently see that for large queries, the client often gets stuck and never returns although the SQL query is shown as successfully finished in the Spark UI, and the data has been correctly written.
This seems due to the operation staying stuck in `ReadyForExecution`, even though the SQL query has finished. Then, the gRPC observer is never notified, and thus a response is never sent to the client.
Even after the SQL query has finished, the following logs keep occuring over and over again:
INFO ExecuteGrpcResponseSender: Starting for opId=8ee873b7-32b9-4eb6-963f-21318e2aebcf, reattachable=true, lastConsumedStreamIndex=0
TRACE ExecuteGrpcResponseSender: Trying to get next response with index=1.
TRACE ExecuteGrpcResponseSender: Acquired executionObserver lock.
TRACE ExecuteGrpcResponseSender: Try to get response with index=1 from observer.
TRACE ExecuteGrpcResponseSender: Response index=1 from observer: false
TRACE ExecuteGrpcResponseSender: Wait for response to become available with timeout=120000 ms.{}
The request is still shown in the Spark UI under the Connect tab in the Requests table, with status RUNNING.
Then, after shutting down the client, after a bit, I see the following:
INFO SparkConnectExecutionManager: Started periodic run of SparkConnectExecutionManager maintenance.
INFO SparkConnectExecutionManager: Finished periodic run of SparkConnectExecutionManager maintenance.
INFO SparkConnectExecutionManager: Started periodic run of SparkConnectExecutionManager maintenance.
INFO SparkConnectExecutionManager: Found execution ExecuteInfo(session_id: "9075a58c-f298-496d-afea-c76fc09d37e3", ...,
operation_id: "8ee873b7-32b9-4eb6-963f-21318e2aebcf"
,,9075a58c-f298-496d-afea-c76fc09d37e3,8ee873b7-32b9-4eb6-963f-21318e2aebcf,
SparkConnect_OperationTag_User__Session_9075a58c-f298-496d-afea-c76fc09d37e3_Operation_8ee873b7-32b9-4eb6-963f-21318e2aebcf,
Set(),true,ReadyForExecution,1730282712338,Some(1730290272497),None)
that was abandoned and expired and will be removed.
INFO SparkConnectExecutionManager: ExecuteHolder ExecuteKey(,9075a58c-f298-496d-afea-c76fc09d37e3,8ee873b7-32b9-4eb6-963f-21318e2aebcf) is removed.
INFO ExecuteResponseObserver: Release all for opId=8ee873b7-32b9-4eb6-963f-21318e2aebcf. Execution stats: total=CachedSize(0,0) autoRemoved=CachedSize(0,0) cachedUntilConsumed=CachedSize(0,0) cachedUntilProduced=CachedSize(0,0) maxCachedUntilConsumed=CachedSize(0,0) maxCachedUntilProduced=CachedSize(0,0)
INFO ExecuteResponseObserver: Release all for opId=8ee873b7-32b9-4eb6-963f-21318e2aebcf. Execution stats: total=CachedSize(0,0) autoRemoved=CachedSize(0,0) cachedUntilConsumed=CachedSize(0,0) cachedUntilProduced=CachedSize(0,0) maxCachedUntilConsumed=CachedSize(0,0) maxCachedUntilProduced=CachedSize(0,0)
java.lang.IllegalStateException:
operationId: 8ee873b7-32b9-4eb6-963f-21318e2aebcf with status ReadyForExecution
is not within statuses List(Finished, Failed, Canceled) for event Closed
at org.apache.spark.sql.connect.service.ExecuteEventsManager.assertStatus(ExecuteEventsManager.scala:261)
at org.apache.spark.sql.connect.service.ExecuteEventsManager.postClosed(ExecuteEventsManager.scala:229)
at org.apache.spark.sql.connect.service.ExecuteHolder.$anonfun$close$1(ExecuteHolder.scala:240)
at org.apache.spark.sql.connect.service.ExecuteHolder.$anonfun$close$1$adapted(ExecuteHolder.scala:234)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
My guess is that an exception occurs in `SparkConnectPlanner.handleWriteOperationV2` after the query has succeeded, but before we get to call `executeHolder.eventsManager.postFinished`. However, I have not seen anything in the logs.
I have not been able to construct a proof-of-concept to reproduce, but this occurs very often on real workloads. I am unclear on the trigger: this almost never occurs on 'small' jobs (meaning < 30mins and < 100,000 partitions, say), but often (say, one in five) on larger jobs.