Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-50475

NullPointerException when saving a df with 'noop' format after joining wide dfs

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Trivial
    • Resolution: Not A Bug
    • 3.4.0, 3.5.3
    • None
    • Spark Core
    • None
    • Important

    Description

      When joining two dataframes, Spark throws a NullPointerException

       

      from pyspark.sql.functions import lit
      import logging
      from pyspark.sql.functions import col
      from pyspark.sql import functions as FPREFIX_PP = "__GDPR__PP__"
      
      path = "s3://XXX/my_dataset.parquet"
      pdate = "20241022"
      psource = "MY_PSOURCE"
      input_df = spark.read.parquet(path+"/pdate="+pdate+"/psource="+psource).withColumn("pdate",lit(pdate)).withColumn("psource",lit(psource))
      df = input_df
      
      nr_cols = 20
      
      pp_df = spark.read.parquet("s3://XXX/my_privacy.parquet/pdate=20241125/psource=MY_PSOURCE/")
      party_privacy_columns = [
      

      f"col{i}" for i in range(nr_cols)

      ] + [pp_join_field]
      
      pp_df = pp_df.select(*party_privacy_columns)for col_name in pp_df.columns:  # rename pp columns
          new_col_name = PREFIX_PP + col_name
          pp_df = pp_df.withColumnRenamed(col_name, new_col_name)data_controller_column = "CPN_NO"
      col_cc_join_field = f"{PREFIX_PP}CPN_NO"
      
      # Let's try to select only the columns needed to join
      print(len(df.columns))
      nr_cols_keep = 130 # Fails at 125
      #pp_df = pp_df.select(col(col_pp_join_field),col(col_cc_join_field))
      df = df.select(col(party_identifier_attrib),col(data_controller_column),*df.columns[:nr_cols_keep])print(df.where((col(party_identifier_attrib).isNull()) | (col(party_identifier_attrib) == "")).count())
      print(df.where((col(data_controller_column).isNull()) | (col(data_controller_column) == "")).count())
      
      # Let's try to drop records from pp_df that have null for col_pp_join_field or col_cc_join_field
      pp_df = pp_df.dropna(how='any',subset=[col_pp_join_field,col_cc_join_field])
      df = df.dropna(how='any',subset=[party_identifier_attrib,data_controller_column])
      
      print(df_unknown_customers.count())
      df_unknown_customers.printSchema()
      df_unknown_customers.write.format("noop").mode("overwrite").save()
      print("unknown customers ok")df_known_customers = df.join(pp_df,
              (col(party_identifier_attrib).eqNullSafe(col(col_pp_join_field)))
              & (col(data_controller_column).eqNullSafe(col(col_cc_join_field))),
              how="inner",
             )
      #df_known_customers.cache()
      print(df_known_customers.count())
      #df_known_customers.explain(mode="formatted")
      df_known_customers.printSchema()
      df_known_customers.write.format("noop").mode("overwrite").save()
      print("known customers ok")

      Full stacktrace:

      ---------------------------------------------------------------------------
      Py4JJavaError                             Traceback (most recent call last)
      Cell In[5], line 105
          103 #df_known_customers.explain(mode="formatted")
          104 df_known_customers.printSchema()
      --> 105 df_known_customers.write.format("noop").mode("overwrite").save()
          106 print("known customers ok")
      
      File ~/.conda/envs/demo_oktopuss_datalink-env_34/lib/python3.11/site-packages/pyspark/sql/readwriter.py:1396, in DataFrameWriter.save(self, path, format, mode, partitionBy, **options)
         1394     self.format(format)
         1395 if path is None:
      -> 1396     self._jwrite.save()
         1397 else:
         1398     self._jwrite.save(path)
      
      File ~/.conda/envs/demo_oktopuss_datalink-env_34/lib/python3.11/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
         1316 command = proto.CALL_COMMAND_NAME +\
         1317     self.command_header +\
         1318     args_command +\
         1319     proto.END_COMMAND_PART
         1321 answer = self.gateway_client.send_command(command)
      -> 1322 return_value = get_return_value(
         1323     answer, self.gateway_client, self.target_id, self.name)
         1325 for temp_arg in temp_args:
         1326     if hasattr(temp_arg, "_detach"):
      
      File ~/.conda/envs/demo_oktopuss_datalink-env_34/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py:169, in capture_sql_exception.<locals>.deco(*a, **kw)
          167 def deco(*a: Any, **kw: Any) -> Any:
          168     try:
      --> 169         return f(*a, **kw)
          170     except Py4JJavaError as e:
          171         converted = convert_exception(e.java_exception)
      
      File ~/.conda/envs/demo_oktopuss_datalink-env_34/lib/python3.11/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
          324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
          325 if answer[1] == REFERENCE_TYPE:
      --> 326     raise Py4JJavaError(
          327         "An error occurred while calling {0}{1}{2}.\n".
          328         format(target_id, ".", name), value)
          329 else:
          330     raise Py4JError(
          331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
          332         format(target_id, ".", name, value))
      
      Py4JJavaError: An error occurred while calling o800.save.
      : java.util.concurrent.ExecutionException: java.lang.NullPointerException
      	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
      	at java.util.concurrent.FutureTask.get(FutureTask.java:206)
      	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:209)
      	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeBroadcast$1(SparkPlan.scala:208)
      	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
      	at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:204)
      	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doExecute(BroadcastHashJoinExec.scala:142)
      	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195)
      	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
      	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191)
      	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:384)
      	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:382)
      	at org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec.writeWithV2(WriteToDataSourceV2Exec.scala:266)
      	at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:360)
      	at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:359)
      	at org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec.run(WriteToDataSourceV2Exec.scala:266)
      	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
      	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
      	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
      	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
      	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
      	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
      	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
      	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
      	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
      	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
      	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
      	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
      	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
      	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
      	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
      	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
      	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
      	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
      	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
      	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
      	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
      	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
      	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:318)
      	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
      	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
      	at py4j.Gateway.invoke(Gateway.java:282)
      	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
      	at py4j.commands.CallCommand.execute(CallCommand.java:79)
      	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
      	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
      	at java.lang.Thread.run(Thread.java:750)
      Caused by: java.lang.NullPointerException
      	at org.apache.spark.util.io.ChunkedByteBuffer.$anonfun$getChunks$1(ChunkedByteBuffer.scala:181)
      	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
      	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
      	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
      	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
      	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
      	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
      	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
      	at org.apache.spark.util.io.ChunkedByteBuffer.getChunks(ChunkedByteBuffer.scala:181)
      	at org.apache.spark.util.io.ChunkedByteBufferInputStream.<init>(ChunkedByteBuffer.scala:278)
      	at org.apache.spark.util.io.ChunkedByteBuffer.toInputStream(ChunkedByteBuffer.scala:174)
      	at org.apache.spark.sql.execution.SparkPlan.decodeUnsafeRows(SparkPlan.scala:409)
      	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollectIterator$2(SparkPlan.scala:457)
      	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
      	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
      	at org.apache.spark.sql.execution.joins.UnsafeHashedRelation$.apply(HashedRelation.scala:476)
      	at org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:160)
      	at org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:1163)
      	at org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:1151)
      	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:148)
      	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:217)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	... 1 more 

       

      The key columns have the same type in both dfs.

      There are no None's in the key columns in either dataframe.

       

      The issue only appears for a specific psource partition; it doesn't appear for the other 'psource' partitions (which have the exact same schema).

       

       

      If we only select a limited set of columns of the wide df before joining, there is no issue.

       

      I tested with the exact same code/data against the following spark versions (these are the ones made available in our corporate environment):

      • 3.2.0 => no issue
      • 3.3.0 => no issue
      • 3.4.0 => NullPointerException
      • 3.5.3 => NullPointerException

      I'd guess some kind of optimization of the execution plan was introduced in spark >=3.4 for wide dataframes, but fails in this specific case.

       

      I can't share the actual dataframes, but I can share statistics upon request. I'm still trying to reproduce the issue with a dummy/fabricated dataframe (that I can share).

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            cedriccuypers Cedric Cuypers
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: