Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
3.2.1
-
None
-
None
-
About the k8s setup:
- 6+ nodes in AWS
- 4 nodes in DC
Spark 3.2.1 + spark-hadoop-cloud 3.2.1
JAVA_HOME=/Users/gleb.abroskin/Library/Java/JavaVirtualMachines/corretto-11.0.13/Contents/Home spark-submit \ --master k8s://https://kubemaster:6443 \ --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///opt/spark/jars/log4j-trace.properties" \ --conf spark.submit.deployMode=cluster \ --conf spark.kubernetes.namespace=ml \ --conf spark.kubernetes.container.image=SPARK_WITH_TRACE_LOGS_BAKED_IN \ --conf spark.kubernetes.container.image.pullSecrets=aws-ecr \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-submitter \ --conf spark.kubernetes.authenticate.submission.oauthToken=XXX \ --conf spark.kubernetes.executor.podNamePrefix=ds-224-executor-test \ --conf spark.kubernetes.file.upload.path=s3a://ifunny-ml-data/dev/spark \ --conf "spark.hadoop.fs.s3a.access.key=XXX" \ --conf "spark.hadoop.fs.s3a.secret.key=XXX" \ --conf spark.hadoop.fs.s3a.endpoint=s3.us-east-1.amazonaws.com \ --conf "spark.kubernetes.driverEnv.AWS_ACCESS_KEY_ID=XXX" \ --conf "spark.kubernetes.driverEnv.AWS_SECRET_ACCESS_KEY=XXX" \ --conf spark.sql.shuffle.partitions=500 \ --num-executors 100 \ --driver-java-options="-Dlog4j.configuration=file:///opt/spark/jars/log4j-trace.properties" \ --name k8s-pyspark-test \ main.py
main.py is just pi.py from the examples, modified to work on 100 machines (this is a way to make sure executors are deployed in both AWS & DC)
import sys from random import random from operator import add from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession \ .builder \ .appName("PythonPi") \ .getOrCreate() spark.sparkContext.setLogLevel("TRACE") partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 100 n = 10000000 * partitions def f(_: int) -> float: x = random() * 2 - 1 y = random() * 2 - 1 return 1 if x ** 2 + y ** 2 <= 1 else 0 count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add) print("Pi is roughly %f" % (4.0 * count / n))
About the k8s setup: 6+ nodes in AWS 4 nodes in DC Spark 3.2.1 + spark-hadoop-cloud 3.2.1 JAVA_HOME=/Users/gleb.abroskin/Library/Java/JavaVirtualMachines/corretto-11.0.13/Contents/Home spark-submit \ --master k8s: //https://kubemaster:6443 \ --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file: ///opt/spark/jars/log4j-trace.properties" \ --conf spark.submit.deployMode=cluster \ --conf spark.kubernetes.namespace=ml \ --conf spark.kubernetes.container.image=SPARK_WITH_TRACE_LOGS_BAKED_IN \ --conf spark.kubernetes.container.image.pullSecrets=aws-ecr \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-submitter \ --conf spark.kubernetes.authenticate.submission.oauthToken=XXX \ --conf spark.kubernetes.executor.podNamePrefix=ds-224-executor-test \ --conf spark.kubernetes.file.upload.path=s3a: //ifunny-ml-data/dev/spark \ --conf "spark.hadoop.fs.s3a.access.key=XXX" \ --conf "spark.hadoop.fs.s3a.secret.key=XXX" \ --conf spark.hadoop.fs.s3a.endpoint=s3.us-east-1.amazonaws.com \ --conf "spark.kubernetes.driverEnv.AWS_ACCESS_KEY_ID=XXX" \ --conf "spark.kubernetes.driverEnv.AWS_SECRET_ACCESS_KEY=XXX" \ --conf spark.sql.shuffle.partitions=500 \ --num-executors 100 \ --driver-java-options= "-Dlog4j.configuration=file: ///opt/spark/jars/log4j-trace.properties" \ --name k8s-pyspark-test \ main.py main.py is just pi.py from the examples, modified to work on 100 machines (this is a way to make sure executors are deployed in both AWS & DC) import sys from random import random from operator import add from pyspark.sql import SparkSession if __name__ == "__main__" : spark = SparkSession \ .builder \ .appName( "PythonPi" ) \ .getOrCreate() spark.sparkContext.setLogLevel( "TRACE" ) partitions = int (sys.argv[1]) if len(sys.argv) > 1 else 100 n = 10000000 * partitions def f(_: int ) -> float : x = random() * 2 - 1 y = random() * 2 - 1 return 1 if x ** 2 + y ** 2 <= 1 else 0 count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add) print( "Pi is roughly %f" % (4.0 * count / n))
Description
I understand that the issue is quite subtle and might be hard to debug, still I was not able to find issue with our infra, so I guess that is something inside the spark.
We deploy spark application in k8s and everything works well, if all the driver & executor pods are either in AWS or our DC, but in case they are split between datacenters something strange happens, for example, logs of one of the executors inside the DC
22/08/26 07:55:35 INFO TransportClientFactory: Successfully created connection to /172.19.149.92:39414 after 50 ms (1 ms spent in bootstraps) 22/08/26 07:55:35 TRACE TransportClient: Sending RPC to /172.19.149.92:39414 22/08/26 07:55:35 TRACE TransportClient: Sending request RPC 4860401977118244334 to /172.19.149.92:39414 took 3 ms 22/08/26 07:55:35 DEBUG TransportClient: Sending fetch chunk request 0 to /172.19.149.92:39414 22/08/26 07:55:35 TRACE TransportClient: Sending request StreamChunkId[streamId=1644979023003,chunkIndex=0] to /172.19.149.92:39414 took 0 ms 22/08/26 07:57:35 ERROR TransportChannelHandler: Connection to /172.19.149.92:39414 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.shuffle.io.connectionTimeout if this is wrong.
The executor successfully creates connection & sends the request, but the connection was assumed dead. Even stranger the executor on ip 172.19.149.92 have sent the response back, which I can confirm with following logs
22/08/26 07:55:35 TRACE MessageDecoder: Received message ChunkFetchRequest: ChunkFetchRequest[streamChunkId=StreamChunkId[streamId=1644979023003,chunkIndex=0]] 22/08/26 07:55:35 TRACE ChunkFetchRequestHandler: Received req from /172.19.123.197:37626 to fetch block StreamChunkId[streamId=1644979023003,chunkIndex=0] 22/08/26 07:55:35 TRACE OneForOneStreamManager: Removing stream id 1644979023003 22/08/26 07:55:35 TRACE BlockInfoManager: Task -1024 releasing lock for broadcast_0_piece0 -- 22/08/26 07:55:35 TRACE BlockInfoManager: Task -1024 releasing lock for broadcast_0_piece0 22/08/26 07:55:35 TRACE ChunkFetchRequestHandler: Sent result ChunkFetchSuccess[streamChunkId=StreamChunkId[streamId=1644979023003,chunkIndex=0],buffer=org.apache.spark.storage.BlockManagerManagedBuffer@79b43e2a] to client /172.19.123.197:37626
A few suspicious moments here:
- connection to pod looks like /<IP>, while connection to driver looks like <POD_NAME>.<NAMESPACE>.svc/<IP>
- Task -1024 releasing lock for broadcast_0_piece0