Details
Description
It was discovered that a particular data distribution in a DataFrame with groupBy clause could result in a JVM crash when calling df.rdd.isEmpty.
For example,
data = [] for t in range(0, 10000): id = str(uuid.uuid4()) if t == 0: for i in range(0, 99): data.append((id,)) elif t < 10: for i in range(0, 75): data.append((id,)) elif t < 100: for i in range(0, 50): data.append((id,)) elif t < 1000: for i in range(0, 25): data.append((id,)) else: for i in range(0, 10): data.append((id,)) df = self.spark.createDataFrame(data, ["col"]) df.coalesce(1).write.parquet(tmpPath) res = self.spark.read.parquet(tmpPath).groupBy("col").count() print(res.rdd.isEmpty()) # crashes JVM
Reproducible 100% on this dataset.
The ticket is related to (can be thought of as a follow-up for) https://issues.apache.org/jira/browse/SPARK-33277. We need to patch one more place to make sure Python iterator is in sync with Java iterator and is terminated whenever the task is marked as completed.
Note that all other operations appear to work fine: count, collect.
Attachments
Issue Links
- relates to
-
SPARK-33277 Python/Pandas UDF right after off-heap vectorized reader could cause executor crash.
- Resolved
- links to