Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.0.0
-
None
Description
Hello,
Here is a crash in Spark SQL joins, with a minimal reproducible test case. Interestingly, it only seems to happen when reading Parquet data (I added a crash = True variable to show it)
This is an left_outer example, but it also crashes with a regular inner join.
import os from pyspark import SparkContext from pyspark.sql import types as SparkTypes from pyspark.sql import SQLContext sc = SparkContext() sqlc = SQLContext(sc) schema1 = SparkTypes.StructType([ SparkTypes.StructField("id1", SparkTypes.LongType(), nullable=True) ]) schema2 = SparkTypes.StructType([ SparkTypes.StructField("id2", SparkTypes.LongType(), nullable=True) ]) # Valid Long values (-9223372036854775808 < -5543241376386463808 , 4661454128115150227 < 9223372036854775807) data1 = [(4661454128115150227,), (-5543241376386463808,)] data2 = [(650460285, )] df1 = sqlc.createDataFrame(sc.parallelize(data1), schema1) df2 = sqlc.createDataFrame(sc.parallelize(data2), schema2) crash = True if crash: os.system("rm -rf /tmp/sparkbug") df1.write.parquet("/tmp/sparkbug/vertex") df2.write.parquet("/tmp/sparkbug/edge") df1 = sqlc.read.load("/tmp/sparkbug/vertex") df2 = sqlc.read.load("/tmp/sparkbug/edge") result_df = df2.join(df1, on=(df1.id1 == df2.id2), how="left_outer") # Should print [Row(id2=650460285, id1=None)] print result_df.collect()
When ran with spark-submit, the final collect() call crashes with this:
py4j.protocol.Py4JJavaError: An error occurred while calling o61.collectToPython. : org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:120) at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:229) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:124) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:242) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:83) at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153) at org.apache.spark.sql.execution.BatchedDataSourceScanExec.consume(ExistingRDD.scala:225) at org.apache.spark.sql.execution.BatchedDataSourceScanExec.doProduce(ExistingRDD.scala:328) at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83) at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78) at org.apache.spark.sql.execution.BatchedDataSourceScanExec.produce(ExistingRDD.scala:225) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:77) at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83) at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:38) at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:309) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:347) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86) at org.apache.spark.sql.Dataset.javaToPython(Dataset.scala:2507) at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2513) at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2513) at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2513) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532) at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2512) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:211) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NegativeArraySizeException at org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.optimize(HashedRelation.scala:619) at org.apache.spark.sql.execution.joins.LongHashedRelation$.apply(HashedRelation.scala:806) at org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:105) at org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:816) at org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:812) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:90) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:72) at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:94) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more