Details
Description
Using limit on a DataFrame prior to groupBy will lead to a crash. Repartitioning will avoid the crash.
will crash: df.limit(3).groupBy("user_id").count().show()
will work: df.limit(3).coalesce(1).groupBy('user_id').count().show()
will work: df.limit(3).repartition('user_id').groupBy('user_id').count().show()
Here is a reproducible example along with the error message:
>>> df = spark.createDataFrame([ (1, 1), (1, 3), (2, 1), (3, 2), (3, 3) ], ["user_id", "genre_id"])
>>>
>>> df.show()
-------------+
user_id genre_id ------
-------+
1 1 1 3 2 1 3 2 3 3 ------
-------+
>>> df.groupBy("user_id").count().show()
----------+
user_id count ------
----+
1 2 3 2 2 1 ------
----+
>>> df.limit(3).groupBy("user_id").count().show()
[Stage 8:===================================================>(1964 + 24) / 2000]16/11/21 01:59:27 WARN TaskSetManager: Lost task 0.0 in stage 9.0 (TID 8204, lvsp20hdn012.stubprod.com): java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Attachments
Issue Links
- is duplicated by
-
SPARK-18851 DataSet Limit into Aggregate Results in NPE in Codegen
- Resolved
-
SPARK-19037 Run count(distinct x) from sub query found some errors
- Resolved
- links to