Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
3.0.0
-
None
-
None
-
Pyspark 3.0.0
PyArrow - 1.0.1(also tried with Pyarrrow 0.15.1, no progress there)
Pandas - 0.25.3
Description
Hi,
I am facing issues in running Pandas UDF on a yarn cluster with multiple nodes, I am trying to perform a simple DBSCAN algorithm to multiple groups in my dataframe, to start with, I am just using a simple example to test things out -
import pandas as pd from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType from sklearn.cluster import DBSCAN from pyspark.sql.functions import pandas_udf, PandasUDFTypedata data = [(1, 11.6133, 48.1075), (1, 11.6142, 48.1066), (1, 11.6108, 48.1061), (1, 11.6207, 48.1192), (1, 11.6221, 48.1223), (1, 11.5969, 48.1276), (2, 11.5995, 48.1258), (2, 11.6127, 48.1066), (2, 11.6430, 48.1275), (2, 11.6368, 48.1278), (2, 11.5930, 48.1156)] df = spark.createDataFrame(data, ["id", "X", "Y"]) output_schema = StructType( [ StructField('id', IntegerType()), StructField('X', DoubleType()), StructField('Y', DoubleType()), StructField('cluster', IntegerType()) ] ) @pandas_udf(output_schema, PandasUDFType.GROUPED_MAP) def dbscan(data): data["cluster"] = DBSCAN(eps=5, min_samples=3).fit_predict(data[["X", "Y"]]) result = pd.DataFrame(data, columns=["id", "X", "Y", "cluster"]) return result res = df.groupby("id").apply(dbscan) res.show()
The code keeps running forever on the yarn cluster, I expect it to be finished within seconds(this works fine on standalone mode and finishes in 2-4 seconds), on checking the Spark UI, I can see that the Spark job is stuck(99/580) and doesn't make any progress forever.
Also it doesn't run in parallel, am I missing something?
I am new to Spark, and still trying to understand a lot of things.