Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-18598

Encoding a Java Bean with extra accessors, produces inconsistent Dataset, resulting in AssertionError

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Minor
    • Resolution: Abandoned
    • 2.0.2
    • None
    • SQL
    • None

    Description

      Most operations of org.apache.spark.sql.Dataset throw java.lang.AssertionError when the Dataset was created with an Java bean Encoder, where the bean has more accessors than properties.

      The following until test demonstrates the steps to replicate:

      import org.apache.spark.sql.Dataset;
      import org.apache.spark.sql.Encoder;
      import org.apache.spark.sql.Encoders;
      import org.apache.spark.sql.SparkSession;
      import org.junit.Test;
      import org.xml.sax.SAXException;
      
      import java.io.IOException;
      
      import static java.util.Collections.singletonList;
      
      public class SparkBeanEncoderTest {
      
      
          public static class TestBean2 {
      
              private String name;
      
              public void setName(String name) {
                  this.name = name;
              }
      
              public String getName() {
                  return name;
              }
      
              public String getName2() {
                  return name.toLowerCase();
              }
          }
      
          @Test
          public void testCreateDatasetFromBeanFailure() throws IOException, SAXException {
      
              SparkSession spark = SparkSession
                      .builder()
                      .master("local")
                      .getOrCreate();
      
              TestBean2 bean = new TestBean2();
              bean.setName("testing123");
      
              Encoder<TestBean2> encoder = Encoders.bean(TestBean2.class);
      
              Dataset<TestBean2> dataset = spark.createDataset(singletonList(bean), encoder);
      
              dataset.show();
      
              spark.stop();
          }
      
      }
      

      Running the above produces the following output:

      16/11/27 14:00:04 INFO SparkContext: Running Spark version 2.0.2
      16/11/27 14:00:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
      16/11/27 14:00:04 WARN Utils: Your hostname, XXXX resolves to a loopback address: 127.0.1.1; using 192.168.1.68 instead (on interface eth0)
      16/11/27 14:00:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
      16/11/27 14:00:04 INFO SecurityManager: Changing view acls to: XXXX
      16/11/27 14:00:04 INFO SecurityManager: Changing modify acls to: XXXX
      16/11/27 14:00:04 INFO SecurityManager: Changing view acls groups to: 
      16/11/27 14:00:04 INFO SecurityManager: Changing modify acls groups to: 
      16/11/27 14:00:04 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(XXXX); groups with view permissions: Set(); users  with modify permissions: Set(XXXX); groups with modify permissions: Set()
      16/11/27 14:00:05 INFO Utils: Successfully started service 'sparkDriver' on port 34688.
      16/11/27 14:00:05 INFO SparkEnv: Registering MapOutputTracker
      16/11/27 14:00:05 INFO SparkEnv: Registering BlockManagerMaster
      16/11/27 14:00:05 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-0ae3a00f-eb46-4be2-8ece-1873f3db1a29
      16/11/27 14:00:05 INFO MemoryStore: MemoryStore started with capacity 3.0 GB
      16/11/27 14:00:05 INFO SparkEnv: Registering OutputCommitCoordinator
      16/11/27 14:00:05 INFO Utils: Successfully started service 'SparkUI' on port 4040.
      16/11/27 14:00:05 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.1.68:4040
      16/11/27 14:00:05 INFO Executor: Starting executor ID driver on host localhost
      16/11/27 14:00:05 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 42688.
      16/11/27 14:00:05 INFO NettyBlockTransferService: Server created on 192.168.1.68:42688
      16/11/27 14:00:05 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.1.68, 42688)
      16/11/27 14:00:05 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.1.68:42688 with 3.0 GB RAM, BlockManagerId(driver, 192.168.1.68, 42688)
      16/11/27 14:00:05 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.1.68, 42688)
      16/11/27 14:00:05 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
      16/11/27 14:00:05 INFO SharedState: Warehouse path is 'file:/home/hamish/git/language-identifier/wikidump/spark-warehouse'.
      16/11/27 14:00:05 INFO CodeGenerator: Code generated in 166.762154 ms
      16/11/27 14:00:06 INFO CodeGenerator: Code generated in 6.144958 ms
      
      java.lang.AssertionError: index (1) should < 1
      
      	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.assertIndexIsValid(UnsafeRow.java:133)
      	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.isNullAt(UnsafeRow.java:352)
      	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
      	at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$1.apply(LocalTableScanExec.scala:38)
      	at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$1.apply(LocalTableScanExec.scala:38)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
      	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
      	at org.apache.spark.sql.execution.LocalTableScanExec.<init>(LocalTableScanExec.scala:38)
      	at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:393)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:60)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:60)
      	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
      	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
      	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:61)
      	at org.apache.spark.sql.execution.SparkPlanner.plan(SparkPlanner.scala:47)
      	at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1$$anonfun$apply$1.applyOrElse(SparkPlanner.scala:51)
      	at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1$$anonfun$apply$1.applyOrElse(SparkPlanner.scala:48)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:308)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:308)
      	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:307)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305)
      	at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1.apply(SparkPlanner.scala:48)
      	at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1.apply(SparkPlanner.scala:48)
      	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
      	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:78)
      	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:76)
      	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:83)
      	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:83)
      	at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2572)
      	at org.apache.spark.sql.Dataset.head(Dataset.scala:1934)
      	at org.apache.spark.sql.Dataset.take(Dataset.scala:2149)
      	at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
      	at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
      	at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
      	at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
      	at SparkBeanEncoderTest.testCreateDatasetFromBeanFailure(SparkBeanEncoderTest.java:47)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:497)
      	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
      	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
      	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
      	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
      	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
      	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
      	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
      	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
      	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
      	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
      	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
      	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
      	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
      	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
      	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
      	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
      	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
      	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:497)
      	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
      
      16/11/27 14:00:06 INFO SparkContext: Invoking stop() from shutdown hook
      16/11/27 14:00:06 INFO SparkUI: Stopped Spark web UI at http://192.168.1.68:4040
      16/11/27 14:00:06 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
      16/11/27 14:00:06 INFO MemoryStore: MemoryStore cleared
      16/11/27 14:00:06 INFO BlockManager: BlockManager stopped
      16/11/27 14:00:06 INFO BlockManagerMaster: BlockManagerMaster stopped
      16/11/27 14:00:06 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
      16/11/27 14:00:06 INFO SparkContext: Successfully stopped SparkContext
      16/11/27 14:00:06 INFO ShutdownHookManager: Shutdown hook called
      16/11/27 14:00:06 INFO ShutdownHookManager: Deleting directory /tmp/spark-bad08a28-51bb-4295-a1e3-691d4679a56c
      

      The problem seems to be caused by an inconsistency in the way bean properties are inspected in org.apache.spark.sql.catalyst.JavaTypeInference; sometimes filtered by the existence of accessors and mutators, sometimes not. This inconsistency percolates back to the org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, where the serializer has a different field count from the schema.

      Desired behaviour here is debatable, but I'm pretty sure AssertionErrors are always a bug. One simple fix would be to introduce a check so it fails faster, and with a more helpful message. Personally, I'd quite like it just work, even when there are too many accessors. To that end I've written a fix,
      which I shall PR shortly.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              hamish Hamish Morgan
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: