Details
-
Bug
-
Status: Closed
-
Trivial
-
Resolution: Not A Bug
-
3.4.0, 3.5.3
-
None
-
None
-
Context:
- Issue appears when using spark 3.4.0 or 3.5.3; it does not appear when using spark 3.2.0 or 3.3.0
- Spark is run on Kubernetes; data is read from s3
Overview of spark context (sensitive items are redacted):
spark.sparkContext.getConf().getAll() [('spark.eventLog.enabled', 'true'), ('spark.kubernetes.driverEnv.AWS_REGION', 'eu-west-1'), ('spark.network.crypto.enabled', 'true'), ('spark.kubernetes.allocation.batch.size', '10'), ('spark.kubernetes.container.image.pullSecrets', 'ecr-credentials'), ('spark.hadoop.fs.s3a.bucket.XXX.server-side-encryption.key', 'arn:aws:kms:eu-west-1:XXX:key/XXX'), ('spark.kubernetes.executor.podNamePrefix', 'XXX'), ('spark.hadoop.fs.s3a.server-side-encryption-algorithm', 'SSE-KMS'), ('spark.driver.extraJavaOptions', '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false'), ('spark.serializer', 'org.apache.spark.serializer.KryoSerializer'), ('spark.hadoop.fs.s3a.endpoint.region', 'eu-west-1'), ('spark.executor.instances', '10'), ('spark.hadoop.fs.s3.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem'), ('spark.hadoop.fs.s3a.bucket.XXX.server-side-encryption-algorithm', 'SSE-KMS'), ('spark.sql.parquet.compression.codec', 'snappy'), ('spark.eventLog.dir', 's3a://XXX'), ('spark.sql.adaptive.enabled', 'False'), ('spark.kubernetes.container.image.pullPolicy', 'Always'), ('spark.kubernetes.driver.annotation.iam.amazonaws.com/role', 'XXX'), ('spark.executor.memory', '4g'), ('spark.sql.session.timeZone', 'CET'), ('spark.executor.id', 'driver'), ('spark.executor.extraJavaOptions', '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false -Dlog4j.debug=true -Dlog4j.logger.org.apache.hadoop=DEBUG'), ('spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version', '2'), ('spark.driver.host', 'XXX'), ('spark.sql.warehouse.dir', 'file:XXX'), ('spark.jars', 'XXX'), ('spark.sql.sources.partitionColumnTypeInference.enabled', 'false'), ('spark.sql.debug.maxToStringFields', '1000'), ('spark.hadoop.fs.s3a.server-side-encryption.key', 'arn:aws:kms:eu-west-1:XXX:key/XXX'), ('spark.authenticate', 'true'), ('spark.hadoop.fs.s3a.multiobjectdelete.enable', 'false'), ('spark.app.initial.archive.urls', 'XXX'), ('spark.hadoop.fs.s3a.aws.credentials.provider', 'com.amazonaws.auth.DefaultAWSCredentialsProviderChain'), ('spark.app.name', 'XXX'), ('spark.kubernetes.executor.request.cores', '1000m'), ('spark.kubernetes.pyspark.pythonVersion', '3'), ('spark.io.encryption.enabled', 'true'), ('spark.serializer.objectStreamReset', '100'), ('spark.archives', 'XXX'), ('spark.kubernetes.executor.limit.cores', '1000m'), ('spark.sql.pyspark.jvmStacktrace.enabled', 'True'), ('spark.submit.deployMode', 'client'), ('spark.kubernetes.driver.request.cores', '1000m'), ('spark.driver.cores', '1'), ('spark.repl.local.jars', 'XXX'), ('spark.app.submitTime', '1733211972479'), ('spark.sql.avro.compression.codec', 'snappy'), ('spark.app.startTime', '1733211972711'), ('spark.logConf', 'true'), ('spark.master', 'k8s://https://kubernetes.default.svc.cluster.local'), ('spark.kubernetes.namespace', 'dbe'), ('spark.kubernetes.executor.annotation.iam.amazonaws.com/role', 'XXX'), ('spark.app.id', 'XXX'), ('spark.driver.port', '40163'), ('spark.app.initial.jar.urls', 'XXX'), ('spark.kubernetes.container.image', 'XXX.dkr.ecr.eu-west-1.amazonaws.com/public/spark-k8s:3.4.0'), ('spark.rdd.compress', 'True'), ('spark.driver.memory', '2g'), ('spark.submit.pyFiles', ''), ('spark.kubernetes.authenticate.driver.serviceAccountName', 'XXX'),
Context: Issue appears when using spark 3.4.0 or 3.5.3; it does not appear when using spark 3.2.0 or 3.3.0 Spark is run on Kubernetes; data is read from s3 Overview of spark context (sensitive items are redacted): spark.sparkContext.getConf().getAll() [( 'spark.eventLog.enabled' , ' true ' ), ( 'spark.kubernetes.driverEnv.AWS_REGION' , 'eu-west-1' ), ( 'spark.network.crypto.enabled' , ' true ' ), ( 'spark.kubernetes.allocation.batch.size' , '10' ), ( 'spark.kubernetes.container.image.pullSecrets' , 'ecr-credentials' ), ( 'spark.hadoop.fs.s3a.bucket.XXX.server-side-encryption.key' , 'arn:aws:kms:eu-west-1:XXX:key/XXX' ), ( 'spark.kubernetes.executor.podNamePrefix' , 'XXX' ), ( 'spark.hadoop.fs.s3a.server-side-encryption-algorithm' , 'SSE-KMS' ), ( 'spark.driver.extraJavaOptions' , '-Djava.net.preferIPv6Addresses= false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle= false ' ), ( 'spark.serializer' , 'org.apache.spark.serializer.KryoSerializer' ), ( 'spark.hadoop.fs.s3a.endpoint.region' , 'eu-west-1' ), ( 'spark.executor.instances' , '10' ), ( 'spark.hadoop.fs.s3.impl' , 'org.apache.hadoop.fs.s3a.S3AFileSystem' ), ( 'spark.hadoop.fs.s3a.bucket.XXX.server-side-encryption-algorithm' , 'SSE-KMS' ), ( 'spark.sql.parquet.compression.codec' , 'snappy' ), ( 'spark.eventLog.dir' , 's3a: //XXX' ), ( 'spark.sql.adaptive.enabled' , 'False' ), ( 'spark.kubernetes.container.image.pullPolicy' , 'Always' ), ( 'spark.kubernetes.driver.annotation.iam.amazonaws.com/role' , 'XXX' ), ( 'spark.executor.memory' , '4g' ), ( 'spark.sql.session.timeZone' , 'CET' ), ( 'spark.executor.id' , 'driver' ), ( 'spark.executor.extraJavaOptions' , '-Djava.net.preferIPv6Addresses= false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle= false -Dlog4j.debug= true -Dlog4j.logger.org.apache.hadoop=DEBUG' ), ( 'spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version' , '2' ), ( 'spark.driver.host' , 'XXX' ), ( 'spark.sql.warehouse.dir' , 'file:XXX' ), ( 'spark.jars' , 'XXX' ), ( 'spark.sql.sources.partitionColumnTypeInference.enabled' , ' false ' ), ( 'spark.sql.debug.maxToStringFields' , '1000' ), ( 'spark.hadoop.fs.s3a.server-side-encryption.key' , 'arn:aws:kms:eu-west-1:XXX:key/XXX' ), ( 'spark.authenticate' , ' true ' ), ( 'spark.hadoop.fs.s3a.multiobjectdelete.enable' , ' false ' ), ( 'spark.app.initial.archive.urls' , 'XXX' ), ( 'spark.hadoop.fs.s3a.aws.credentials.provider' , 'com.amazonaws.auth.DefaultAWSCredentialsProviderChain' ), ( 'spark.app.name' , 'XXX' ), ( 'spark.kubernetes.executor.request.cores' , '1000m' ), ( 'spark.kubernetes.pyspark.pythonVersion' , '3' ), ( 'spark.io.encryption.enabled' , ' true ' ), ( 'spark.serializer.objectStreamReset' , '100' ), ( 'spark.archives' , 'XXX' ), ( 'spark.kubernetes.executor.limit.cores' , '1000m' ), ( 'spark.sql.pyspark.jvmStacktrace.enabled' , 'True' ), ( 'spark.submit.deployMode' , 'client' ), ( 'spark.kubernetes.driver.request.cores' , '1000m' ), ( 'spark.driver.cores' , '1' ), ( 'spark.repl.local.jars' , 'XXX' ), ( 'spark.app.submitTime' , '1733211972479' ), ( 'spark.sql.avro.compression.codec' , 'snappy' ), ( 'spark.app.startTime' , '1733211972711' ), ( 'spark.logConf' , ' true ' ), ( 'spark.master' , 'k8s: //https://kubernetes. default .svc.cluster.local' ), ( 'spark.kubernetes.namespace' , 'dbe' ), ( 'spark.kubernetes.executor.annotation.iam.amazonaws.com/role' , 'XXX' ), ( 'spark.app.id' , 'XXX' ), ( 'spark.driver.port' , '40163' ), ( 'spark.app.initial.jar.urls' , 'XXX' ), ( 'spark.kubernetes.container.image' , 'XXX.dkr.ecr.eu-west-1.amazonaws.com/ public /spark-k8s:3.4.0' ), ( 'spark.rdd.compress' , 'True' ), ( 'spark.driver.memory' , '2g' ), ( 'spark.submit.pyFiles' , ''), ( 'spark.kubernetes.authenticate.driver.serviceAccountName' , 'XXX' ),
-
Important
Description
When joining two dataframes, Spark throws a NullPointerException
from pyspark.sql.functions import lit import logging from pyspark.sql.functions import col from pyspark.sql import functions as FPREFIX_PP = "__GDPR__PP__" path = "s3://XXX/my_dataset.parquet" pdate = "20241022" psource = "MY_PSOURCE" input_df = spark.read.parquet(path+"/pdate="+pdate+"/psource="+psource).withColumn("pdate",lit(pdate)).withColumn("psource",lit(psource)) df = input_df nr_cols = 20 pp_df = spark.read.parquet("s3://XXX/my_privacy.parquet/pdate=20241125/psource=MY_PSOURCE/") party_privacy_columns = [
f"col{i}" for i in range(nr_cols)
] + [pp_join_field] pp_df = pp_df.select(*party_privacy_columns)for col_name in pp_df.columns: # rename pp columns new_col_name = PREFIX_PP + col_name pp_df = pp_df.withColumnRenamed(col_name, new_col_name)data_controller_column = "CPN_NO" col_cc_join_field = f"{PREFIX_PP}CPN_NO" # Let's try to select only the columns needed to join print(len(df.columns)) nr_cols_keep = 130 # Fails at 125 #pp_df = pp_df.select(col(col_pp_join_field),col(col_cc_join_field)) df = df.select(col(party_identifier_attrib),col(data_controller_column),*df.columns[:nr_cols_keep])print(df.where((col(party_identifier_attrib).isNull()) | (col(party_identifier_attrib) == "")).count()) print(df.where((col(data_controller_column).isNull()) | (col(data_controller_column) == "")).count()) # Let's try to drop records from pp_df that have null for col_pp_join_field or col_cc_join_field pp_df = pp_df.dropna(how='any',subset=[col_pp_join_field,col_cc_join_field]) df = df.dropna(how='any',subset=[party_identifier_attrib,data_controller_column]) print(df_unknown_customers.count()) df_unknown_customers.printSchema() df_unknown_customers.write.format("noop").mode("overwrite").save() print("unknown customers ok")df_known_customers = df.join(pp_df, (col(party_identifier_attrib).eqNullSafe(col(col_pp_join_field))) & (col(data_controller_column).eqNullSafe(col(col_cc_join_field))), how="inner", ) #df_known_customers.cache() print(df_known_customers.count()) #df_known_customers.explain(mode="formatted") df_known_customers.printSchema() df_known_customers.write.format("noop").mode("overwrite").save() print("known customers ok")
Full stacktrace:
--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) Cell In[5], line 105 103 #df_known_customers.explain(mode="formatted") 104 df_known_customers.printSchema() --> 105 df_known_customers.write.format("noop").mode("overwrite").save() 106 print("known customers ok") File ~/.conda/envs/demo_oktopuss_datalink-env_34/lib/python3.11/site-packages/pyspark/sql/readwriter.py:1396, in DataFrameWriter.save(self, path, format, mode, partitionBy, **options) 1394 self.format(format) 1395 if path is None: -> 1396 self._jwrite.save() 1397 else: 1398 self._jwrite.save(path) File ~/.conda/envs/demo_oktopuss_datalink-env_34/lib/python3.11/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART 1321 answer = self.gateway_client.send_command(command) -> 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: 1326 if hasattr(temp_arg, "_detach"): File ~/.conda/envs/demo_oktopuss_datalink-env_34/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py:169, in capture_sql_exception.<locals>.deco(*a, **kw) 167 def deco(*a: Any, **kw: Any) -> Any: 168 try: --> 169 return f(*a, **kw) 170 except Py4JJavaError as e: 171 converted = convert_exception(e.java_exception) File ~/.conda/envs/demo_oktopuss_datalink-env_34/lib/python3.11/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". 332 format(target_id, ".", name, value)) Py4JJavaError: An error occurred while calling o800.save. : java.util.concurrent.ExecutionException: java.lang.NullPointerException at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:206) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:209) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeBroadcast$1(SparkPlan.scala:208) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243) at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:204) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doExecute(BroadcastHashJoinExec.scala:142) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:384) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:382) at org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec.writeWithV2(WriteToDataSourceV2Exec.scala:266) at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:360) at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:359) at org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec.run(WriteToDataSourceV2Exec.scala:266) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:318) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.NullPointerException at org.apache.spark.util.io.ChunkedByteBuffer.$anonfun$getChunks$1(ChunkedByteBuffer.scala:181) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at org.apache.spark.util.io.ChunkedByteBuffer.getChunks(ChunkedByteBuffer.scala:181) at org.apache.spark.util.io.ChunkedByteBufferInputStream.<init>(ChunkedByteBuffer.scala:278) at org.apache.spark.util.io.ChunkedByteBuffer.toInputStream(ChunkedByteBuffer.scala:174) at org.apache.spark.sql.execution.SparkPlan.decodeUnsafeRows(SparkPlan.scala:409) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollectIterator$2(SparkPlan.scala:457) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at org.apache.spark.sql.execution.joins.UnsafeHashedRelation$.apply(HashedRelation.scala:476) at org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:160) at org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:1163) at org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:1151) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:148) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:217) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more
The key columns have the same type in both dfs.
There are no None's in the key columns in either dataframe.
The issue only appears for a specific psource partition; it doesn't appear for the other 'psource' partitions (which have the exact same schema).
If we only select a limited set of columns of the wide df before joining, there is no issue.
I tested with the exact same code/data against the following spark versions (these are the ones made available in our corporate environment):
- 3.2.0 => no issue
- 3.3.0 => no issue
- 3.4.0 => NullPointerException
- 3.5.3 => NullPointerException
I'd guess some kind of optimization of the execution plan was introduced in spark >=3.4 for wide dataframes, but fails in this specific case.
I can't share the actual dataframes, but I can share statistics upon request. I'm still trying to reproduce the issue with a dummy/fabricated dataframe (that I can share).