Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
3.5.0
-
None
-
None
Description
We have an OpenShift cluster with Spark and JupyterHub and we use Spark-Connect to access Spark from within Jupyter. This worked fine with Spark 3.4.1. However after upgrading to Spark 3.5.0 we were not able to access any data in our Delta Tables through Spark. Initially I assumed it was a bug in Delta: https://github.com/delta-io/delta/issues/2235
The actual error is
SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 13) (172.31.15.72 executor 4): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD
However after further investigation I discovered that this is a regression in Spark 3.5.0. The issue is similar to SPARK-36917, however I am not using any custom functions, nor any other classes than spark-connect, and this setup used to work in 3.4.1. The issue only occurs when remote executors are used in a kubernetes environment. Running a plain Spark-Connect eg
./sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.0
doesn't produce the error.
The issue occurs both in a full OpenShift cluster as in a tiny minikube setup. The steps to reproduce are based on the minikube setup.
You need to have a minimal Spark 3.5.0 setup with 1 driver and at least 1 executor and use python to access data through Spark. The query I used to test this is
from pyspark.sql import SparkSession logFile = '/opt/spark/work-dir/data.csv' spark = SparkSession.builder.remote('sc://spark-connect').getOrCreate() df = spark.read.csv(logFile) df.count()
However it doesn't matter if the data is local, or remote on a S3 storage, nor if the data is plain text, CSV or Delta Table.
Steps to reproduce:
- Install minikube
- Create a service account 'spark'
kubectl create sa spark
- Bind the 'edit' role to the service account
kubectl create rolebinding spark-edit \ --clusterrole=edit \ --serviceaccount=default:spark \ --namespace=default
- Create a service for spark
kubectl create -f service.yml
- Create a Spark-Connect deployment with the default Spark docker image: https://hub.docker.com/_/spark (do change the deployment.yml to point to the kubernetes API endpoint
kubectl create -f deployment.yml
- Add data to both the executor and the driver pods, e.g. login on the terminal of the pods and run on both pods
touch data.csv echo id,name > data.csv echo 1,2 >> data.csv
- Start a spark-remote session to access the newly created data. I logged in on the driver pod and installed the necessary python packages:
python3 -m pip install pandas pyspark grpcio-tools grpcio-status pyarrow
Started a python shell and executed:
from pyspark.sql import SparkSession logFile = '/opt/spark/work-dir/data.csv' spark = SparkSession.builder.remote('sc://spark-connect').getOrCreate() df = spark.read.csv(logFile) df.count()
Necessary files:
Service.yml:
apiVersion: v1
kind: Service
metadata:
labels:
app: spark-connect
name: spark-connect
namespace: default
spec:
ipFamilies:
- IPv4
ports:
- name: connect-grpc
protocol: TCP
port: 15002 # Port the service listens on.
targetPort: 15002 # Port on the backing pods to which the service forwards connections
- name: sparkui
protocol: TCP
port: 4040 # Port the service listens on.
targetPort: 4040 # Port on the backing pods to which the service forwards connections
- name: spark-rpc
protocol: TCP
port: 7078 # Port the service listens on.
targetPort: 7078 # Port on the backing pods to which the service forwards connections
- name: blockmanager
protocol: TCP
port: 7079 # Port the service listens on.
targetPort: 7079 # Port on the backing pods to which the service forwards connections
internalTrafficPolicy: Cluster
type: ClusterIP
ipFamilyPolicy: SingleStack
sessionAffinity: None
selector:
app: spark-connect
deployment.yml: (do replace the spark.master URL with the correct one for your setup)
kind: Deployment apiVersion: apps/v1 metadata: name: spark-connect namespace: default uid: 3a1b448e-4594-47a9-95f6-a82ea4ac9341 resourceVersion: '6107' generation: 23 creationTimestamp: '2023-10-31T13:35:46Z' labels: k8s-app: spark-connect spec: replicas: 1 selector: matchLabels: k8s-app: spark-connect template: metadata: name: spark-connect creationTimestamp: null labels: k8s-app: spark-connect spec: serviceAccount: spark containers: - name: spark-connect image: spark command: - /opt/entrypoint.sh - driver args: - '--class' - org.apache.spark.sql.connect.service.SparkConnectServer - '--name' - spark-connect - '--conf' - spark.driver.blockManager.port=7079 - '--conf' - spark.driver.port=7078 - '--conf' - spark.driver.host=spark-connect - '--conf' - spark.master=k8s://https://<Kubernetes API Address eg https://192.168.49.2:8443> - '--conf' - spark.kubernetes.namespace=default - '--conf' - spark.kubernetes.container.image=spark:latest - '--conf' - spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp - '--conf' - spark.driver.extraJavaOptions=-Divy.home=/tmp - '--conf' - spark.kubernetes.driver.label.app.kubernetes.io/part-of=spark-connect - '--conf' - spark.kubernetes.executor.label.app=spark-connect - '--conf' - spark.executor.memory=1g - '--conf' - spark.executor.cores=1 - '--conf' - spark.executor.instances=1 - '--conf' - spark.kubernetes.executor.podNamePrefix=spark-connect - '--packages' - org.apache.spark:spark-connect_2.12:3.5.0 env: - name: SPARK_DRIVER_BIND_ADDRESS valueFrom: fieldRef: apiVersion: v1 fieldPath: status.podIP resources: {} terminationMessagePath: /dev/termination-log terminationMessagePolicy: File imagePullPolicy: Always securityContext: privileged: true restartPolicy: Always terminationGracePeriodSeconds: 30 dnsPolicy: ClusterFirst securityContext: {} schedulerName: default-scheduler strategy: type: RollingUpdate rollingUpdate: maxUnavailable: 25% maxSurge: 25% revisionHistoryLimit: 10 progressDeadlineSeconds: 600