Details
-
Bug
-
Status: Closed
-
Minor
-
Resolution: Abandoned
-
2.0.2
-
None
-
None
Description
Most operations of org.apache.spark.sql.Dataset throw java.lang.AssertionError when the Dataset was created with an Java bean Encoder, where the bean has more accessors than properties.
The following until test demonstrates the steps to replicate:
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.junit.Test; import org.xml.sax.SAXException; import java.io.IOException; import static java.util.Collections.singletonList; public class SparkBeanEncoderTest { public static class TestBean2 { private String name; public void setName(String name) { this.name = name; } public String getName() { return name; } public String getName2() { return name.toLowerCase(); } } @Test public void testCreateDatasetFromBeanFailure() throws IOException, SAXException { SparkSession spark = SparkSession .builder() .master("local") .getOrCreate(); TestBean2 bean = new TestBean2(); bean.setName("testing123"); Encoder<TestBean2> encoder = Encoders.bean(TestBean2.class); Dataset<TestBean2> dataset = spark.createDataset(singletonList(bean), encoder); dataset.show(); spark.stop(); } }
Running the above produces the following output:
16/11/27 14:00:04 INFO SparkContext: Running Spark version 2.0.2 16/11/27 14:00:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 16/11/27 14:00:04 WARN Utils: Your hostname, XXXX resolves to a loopback address: 127.0.1.1; using 192.168.1.68 instead (on interface eth0) 16/11/27 14:00:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 16/11/27 14:00:04 INFO SecurityManager: Changing view acls to: XXXX 16/11/27 14:00:04 INFO SecurityManager: Changing modify acls to: XXXX 16/11/27 14:00:04 INFO SecurityManager: Changing view acls groups to: 16/11/27 14:00:04 INFO SecurityManager: Changing modify acls groups to: 16/11/27 14:00:04 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(XXXX); groups with view permissions: Set(); users with modify permissions: Set(XXXX); groups with modify permissions: Set() 16/11/27 14:00:05 INFO Utils: Successfully started service 'sparkDriver' on port 34688. 16/11/27 14:00:05 INFO SparkEnv: Registering MapOutputTracker 16/11/27 14:00:05 INFO SparkEnv: Registering BlockManagerMaster 16/11/27 14:00:05 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-0ae3a00f-eb46-4be2-8ece-1873f3db1a29 16/11/27 14:00:05 INFO MemoryStore: MemoryStore started with capacity 3.0 GB 16/11/27 14:00:05 INFO SparkEnv: Registering OutputCommitCoordinator 16/11/27 14:00:05 INFO Utils: Successfully started service 'SparkUI' on port 4040. 16/11/27 14:00:05 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.1.68:4040 16/11/27 14:00:05 INFO Executor: Starting executor ID driver on host localhost 16/11/27 14:00:05 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 42688. 16/11/27 14:00:05 INFO NettyBlockTransferService: Server created on 192.168.1.68:42688 16/11/27 14:00:05 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.1.68, 42688) 16/11/27 14:00:05 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.1.68:42688 with 3.0 GB RAM, BlockManagerId(driver, 192.168.1.68, 42688) 16/11/27 14:00:05 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.1.68, 42688) 16/11/27 14:00:05 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect. 16/11/27 14:00:05 INFO SharedState: Warehouse path is 'file:/home/hamish/git/language-identifier/wikidump/spark-warehouse'. 16/11/27 14:00:05 INFO CodeGenerator: Code generated in 166.762154 ms 16/11/27 14:00:06 INFO CodeGenerator: Code generated in 6.144958 ms java.lang.AssertionError: index (1) should < 1 at org.apache.spark.sql.catalyst.expressions.UnsafeRow.assertIndexIsValid(UnsafeRow.java:133) at org.apache.spark.sql.catalyst.expressions.UnsafeRow.isNullAt(UnsafeRow.java:352) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$1.apply(LocalTableScanExec.scala:38) at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$1.apply(LocalTableScanExec.scala:38) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.execution.LocalTableScanExec.<init>(LocalTableScanExec.scala:38) at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:393) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:60) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:60) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:61) at org.apache.spark.sql.execution.SparkPlanner.plan(SparkPlanner.scala:47) at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1$$anonfun$apply$1.applyOrElse(SparkPlanner.scala:51) at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1$$anonfun$apply$1.applyOrElse(SparkPlanner.scala:48) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:308) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:308) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:307) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305) at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1.apply(SparkPlanner.scala:48) at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1.apply(SparkPlanner.scala:48) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:78) at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:76) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:83) at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2572) at org.apache.spark.sql.Dataset.head(Dataset.scala:1934) at org.apache.spark.sql.Dataset.take(Dataset.scala:2149) at org.apache.spark.sql.Dataset.showString(Dataset.scala:239) at org.apache.spark.sql.Dataset.show(Dataset.scala:526) at org.apache.spark.sql.Dataset.show(Dataset.scala:486) at org.apache.spark.sql.Dataset.show(Dataset.scala:495) at SparkBeanEncoderTest.testCreateDatasetFromBeanFailure(SparkBeanEncoderTest.java:47) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) 16/11/27 14:00:06 INFO SparkContext: Invoking stop() from shutdown hook 16/11/27 14:00:06 INFO SparkUI: Stopped Spark web UI at http://192.168.1.68:4040 16/11/27 14:00:06 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 16/11/27 14:00:06 INFO MemoryStore: MemoryStore cleared 16/11/27 14:00:06 INFO BlockManager: BlockManager stopped 16/11/27 14:00:06 INFO BlockManagerMaster: BlockManagerMaster stopped 16/11/27 14:00:06 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 16/11/27 14:00:06 INFO SparkContext: Successfully stopped SparkContext 16/11/27 14:00:06 INFO ShutdownHookManager: Shutdown hook called 16/11/27 14:00:06 INFO ShutdownHookManager: Deleting directory /tmp/spark-bad08a28-51bb-4295-a1e3-691d4679a56c
The problem seems to be caused by an inconsistency in the way bean properties are inspected in org.apache.spark.sql.catalyst.JavaTypeInference; sometimes filtered by the existence of accessors and mutators, sometimes not. This inconsistency percolates back to the org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, where the serializer has a different field count from the schema.
Desired behaviour here is debatable, but I'm pretty sure AssertionErrors are always a bug. One simple fix would be to introduce a check so it fails faster, and with a more helpful message. Personally, I'd quite like it just work, even when there are too many accessors. To that end I've written a fix,
which I shall PR shortly.
Attachments
Issue Links
- is duplicated by
-
SPARK-20374 Encoder generated using Java beans causes corruption in MapGroupsWithState
- Closed
- links to