Uploaded image for project: 'Kudu'
  1. Kudu
  2. KUDU-2518

SparkSQL queries without temporary tables

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 1.7.1
    • Fix Version/s: None
    • Component/s: hms, spark

      Description

      One long-standing ergonomic issue with the Kudu/SparkSQL integration is the requirement to register Kudu tables as temp tables before they can be scanned using a SQL string (sql("SELECT * FROM my_kudu_table")).  Ideally SparkSQL could query Kudu tables that it discovers via the HMS with no additional configuration.  Yesterday I explored what it would take to get there, and I found some interesting things.

       

      If the HMS table contains a spark.sql.sources.provider table property with a value like org.apache.kudu.spark.kudu.DefaultSource, SparkSQL will automatically instantiate the corresponding RelationProvider class, passing a SQLContext and a map of parameters, which it fills in with the table's HDFS URI, and storage properties.  The current plan for Kudu + HMS integration (KUDU-2191) is not to set any storage properties, instead attributes like master addresses and table ID will be stored as table properties.  As a result, SparkSQL is instantiating a Kudu DefaultSource, but it doesn't pass necessary arguments like the table name or master addresses.   Getting this far required adding a dummy org.apache.kudu.hive.KuduStorageHandler class to the classpath so that the Hive client wouldn't choke on the bogus class name.  The stacktrace from Spark attempting to instantiate the DefaultSource is provided below.

       

      Spark context Web UI available at http://kudu-hms-1.gce.cloudera.com:4041
      Spark context available as 'sc' (master = local[*], app id = local-1532719985143).
      Spark session available as 'spark'.
      Welcome to
            ____              __
           / __/__  ___ _____/ /__
          _\ \/ _ \/ _ `/ __/  '_/
         /___/ .__/\_,_/_/ /_/\_\   version 2.3.1
            /_/
               
      Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
      Type in expressions to have them evaluated.
      Type :help for more information.
      
      scala> sql("DESCRIBE TABLE t1")
      org.spark_project.guava.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: Kudu table name must be specified in create options using key 'kudu.table'.  parameters: Map(), parameters-size: 0, parameters-keys: Set(), path: None
        at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2263)
        at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
        at org.spark_project.guava.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCachedPlan(SessionCatalog.scala:137)
        at org.apache.spark.sql.execution.datasources.FindDataSourceTable.org$apache$spark$sql$execution$datasources$FindDataSourceTable$$readDataSourceTable(DataSourceStrategy.scala:227)
        at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:264)
        at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:255)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
        at org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:255)
        at org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:223)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
        at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
        at scala.collection.immutable.List.foldLeft(List.scala:84)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:124)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:118)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:103)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
        at org.apache.spark.sql.SparkSession.table(SparkSession.scala:627)
        at org.apache.spark.sql.execution.command.DescribeTableCommand.run(tables.scala:548)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
        at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
        at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
        at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)
        ... 49 elided
      Caused by: java.lang.IllegalArgumentException: Kudu table name must be specified in create options using key 'kudu.table'.  parameters: Map(), parameters-size: 0, parameters-keys: Set(), path: None
        at org.apache.kudu.spark.kudu.DefaultSource$$anonfun$11.apply(DefaultSource.scala:82)
        at org.apache.kudu.spark.kudu.DefaultSource$$anonfun$11.apply(DefaultSource.scala:82)
        at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
        at org.apache.spark.sql.catalyst.util.CaseInsensitiveMap.getOrElse(CaseInsensitiveMap.scala:28)
        at org.apache.kudu.spark.kudu.DefaultSource.createRelation(DefaultSource.scala:81)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:340)
        at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:242)
        at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:227)
        at org.spark_project.guava.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
        at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
        at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
        at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
        at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
        ... 96 more
      
      scala>

       

      After striking out with the existing interfaces I looked at the DataSourceRegister API which is a part of the DataSourceV2 effort underway in Spark.  It's not clear that this API actually provides more context when creating relations (we need table name and master addresses from the table properties and options are still just passed as a map in DataSourceOptions), but more significantly it doesn't appear that the spark.sql.sources.provider property works correctly with DataSourceV2 instances, it gives a class cast issue:

       

      Spark context Web UI available at http://kudu-hms-1.gce.cloudera.com:4041
      Spark context available as 'sc' (master = local[*], app id = local-1532720634224).
      Spark session available as 'spark'.
      Welcome to
            ____              __
           / __/__  ___ _____/ /__
          _\ \/ _ \/ _ `/ __/  '_/
         /___/ .__/\_,_/_/ /_/\_\   version 2.3.1
            /_/
               
      Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
      Type in expressions to have them evaluated.
      Type :help for more information.
      
      scala> sql("DESCRIBE TABLE t1")
      org.apache.spark.sql.AnalysisException: org.apache.kudu.spark.KuduDataSource is not a valid Spark SQL Data Source.;
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:415)
        at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:242)
        at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:227)
        at org.spark_project.guava.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
        at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
        at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
        at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
        at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
        at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
        at org.spark_project.guava.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCachedPlan(SessionCatalog.scala:137)
        at org.apache.spark.sql.execution.datasources.FindDataSourceTable.org$apache$spark$sql$execution$datasources$FindDataSourceTable$$readDataSourceTable(DataSourceStrategy.scala:227)
        at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:264)
        at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:255)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
        at org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:255)
        at org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:223)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
        at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
        at scala.collection.immutable.List.foldLeft(List.scala:84)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:124)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:118)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:103)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
        at org.apache.spark.sql.SparkSession.table(SparkSession.scala:627)
        at org.apache.spark.sql.execution.command.DescribeTableCommand.run(tables.scala:548)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
        at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
        at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
        at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)
        ... 49 elided
      
      scala>

       

      org.apache.kudu.spark.KuduDataSource is a dummy class I put on the classpath and added to the Hive metastore table attribute:

       

      class KuduDataSource extends DataSourceV2
      with DataSourceRegister
      with ReadSupport
      {
        override def shortName(): String = "kudu"
      
        override def createReader(options: DataSourceOptions): DataSourceReader = {
          new KuduDataSourceReader(options)
        }
      }
      
      class KuduDataSourceReader(val options: DataSourceOptions) extends DataSourceReader {
      
        override def readSchema(): StructType = ???
      
        override def createDataReaderFactories(): util.List[DataReaderFactory[Row]] = ???
      }
      

       

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                danburkert Dan Burkert
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated: