Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-45769

data retrieval fails on executors with spark connect

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.5.0
    • None
    • Connect
    • None

    Description

      We have an OpenShift cluster with Spark and JupyterHub and we use Spark-Connect to access Spark from within Jupyter. This worked fine with Spark 3.4.1. However after upgrading to Spark 3.5.0 we were not able to access any data in our Delta Tables through Spark. Initially I assumed it was a bug in Delta: https://github.com/delta-io/delta/issues/2235

      The actual error is

      SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 13) (172.31.15.72 executor 4): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD

      However after further investigation I discovered that this is a regression in Spark 3.5.0. The issue is similar to SPARK-36917, however I am not using any custom functions, nor any other classes than spark-connect, and this setup used to work in 3.4.1. The issue only occurs when remote executors are used in a kubernetes environment. Running a plain Spark-Connect eg

      ./sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.0

      doesn't produce the error.

      The issue occurs both in a full OpenShift cluster as in a tiny minikube setup. The steps to reproduce are based on the minikube setup.

      You need to have a minimal Spark 3.5.0 setup with 1 driver and at least 1 executor and use python to access data through Spark. The query I used to test this is

      from pyspark.sql import SparkSession
      logFile = '/opt/spark/work-dir/data.csv'
      spark = SparkSession.builder.remote('sc://spark-connect').getOrCreate()
      df = spark.read.csv(logFile)
      df.count()
      
      
      

      However it doesn't matter if the data is local, or remote on a S3 storage, nor if the data is plain text, CSV or Delta Table.

      Steps to reproduce:

      1. Install minikube
      2. Create a service account 'spark'
        kubectl create sa spark
      1. Bind the 'edit' role to the service account
      kubectl create rolebinding spark-edit \
       --clusterrole=edit \
       --serviceaccount=default:spark \
       --namespace=default
      1. Create a service for spark
      kubectl create -f service.yml
      1. Create a Spark-Connect deployment with the default Spark docker image: https://hub.docker.com/_/spark (do change the deployment.yml to point to the kubernetes API endpoint
      kubectl create -f deployment.yml
      1. Add data to both the executor and the driver pods, e.g. login on the terminal of the pods and run on both pods
      touch data.csv
      echo id,name > data.csv
      echo 1,2 >> data.csv 
      1. Start a spark-remote session to access the newly created data. I logged in on the driver pod and installed the necessary python packages:
      python3 -m pip install pandas pyspark grpcio-tools grpcio-status pyarrow

      Started a python shell and executed:

      from pyspark.sql import SparkSession
      logFile = '/opt/spark/work-dir/data.csv'
      spark = SparkSession.builder.remote('sc://spark-connect').getOrCreate()
      df = spark.read.csv(logFile)
      df.count() 

      Necessary files:

      Service.yml:

      apiVersion: v1
      kind: Service
      metadata:
        labels:
          app: spark-connect        
        name: spark-connect
        namespace: default
      spec:
        ipFamilies:
          - IPv4
        ports:
          - name: connect-grpc
            protocol: TCP
            port: 15002 # Port the service listens on.
            targetPort: 15002 # Port on the backing pods to which the service forwards connections
          - name: sparkui
            protocol: TCP
            port: 4040 # Port the service listens on.
            targetPort: 4040 # Port on the backing pods to which the service forwards connections
          - name: spark-rpc
            protocol: TCP
            port: 7078 # Port the service listens on.
            targetPort: 7078 # Port on the backing pods to which the service forwards connections
          - name: blockmanager
            protocol: TCP
            port: 7079 # Port the service listens on.
            targetPort: 7079 # Port on the backing pods to which the service forwards connections
        internalTrafficPolicy: Cluster
        type: ClusterIP
        ipFamilyPolicy: SingleStack
        sessionAffinity: None
        selector:
          app: spark-connect 

      deployment.yml: (do replace the spark.master URL with the correct one for your setup)

      kind: Deployment
      apiVersion: apps/v1
      metadata:
         name: spark-connect
         namespace: default
         uid: 3a1b448e-4594-47a9-95f6-a82ea4ac9341
         resourceVersion: '6107'
         generation: 23
         creationTimestamp: '2023-10-31T13:35:46Z'
         labels:
           k8s-app: spark-connect
      spec:
         replicas: 1
         selector:
           matchLabels:
             k8s-app: spark-connect
         template:
           metadata:
             name: spark-connect
             creationTimestamp: null
             labels:
               k8s-app: spark-connect
           spec:
             serviceAccount: spark
             containers:
               - name: spark-connect
                 image: spark
                 command:
                   - /opt/entrypoint.sh
                   - driver
                 args:
                   - '--class'
                   - org.apache.spark.sql.connect.service.SparkConnectServer
                   - '--name'
                   - spark-connect
                   - '--conf'
                   - spark.driver.blockManager.port=7079
                   - '--conf'
                   - spark.driver.port=7078
                   - '--conf'
                   - spark.driver.host=spark-connect
                   - '--conf'
                   - spark.master=k8s://https://<Kubernetes API Address eg https://192.168.49.2:8443>
                   - '--conf'
                   - spark.kubernetes.namespace=default
                   - '--conf'
                   - spark.kubernetes.container.image=spark:latest
                   - '--conf'
                   - spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp
                   - '--conf'
                   - spark.driver.extraJavaOptions=-Divy.home=/tmp             
                   - '--conf'
                   - spark.kubernetes.driver.label.app.kubernetes.io/part-of=spark-connect
                   - '--conf'
                   - spark.kubernetes.executor.label.app=spark-connect
                   
                   - '--conf'
                   - spark.executor.memory=1g
                   - '--conf'
                   - spark.executor.cores=1
                   - '--conf'
                   - spark.executor.instances=1
                   - '--conf'
                   - spark.kubernetes.executor.podNamePrefix=spark-connect
                   - '--packages'
                   - org.apache.spark:spark-connect_2.12:3.5.0
                 env:
                   - name: SPARK_DRIVER_BIND_ADDRESS
                     valueFrom:
                       fieldRef:
                         apiVersion: v1
                         fieldPath: status.podIP
                 resources: {}
                 terminationMessagePath: /dev/termination-log
                 terminationMessagePolicy: File
                 imagePullPolicy: Always
                 securityContext:
                   privileged: true
             restartPolicy: Always
             terminationGracePeriodSeconds: 30
             dnsPolicy: ClusterFirst
             securityContext: {}
             schedulerName: default-scheduler
         strategy:
           type: RollingUpdate
           rollingUpdate:
             maxUnavailable: 25%
             maxSurge: 25%
         revisionHistoryLimit: 10
         progressDeadlineSeconds: 600 

      Attachments

        Activity

          People

            Unassigned Unassigned
            stvno Steven Ottens
            Votes:
            1 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated: