Details
-
Improvement
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.7.1
-
None
Description
One long-standing ergonomic issue with the Kudu/SparkSQL integration is the requirement to register Kudu tables as temp tables before they can be scanned using a SQL string (sql("SELECT * FROM my_kudu_table")). Ideally SparkSQL could query Kudu tables that it discovers via the HMS with no additional configuration. Yesterday I explored what it would take to get there, and I found some interesting things.
If the HMS table contains a spark.sql.sources.provider table property with a value like org.apache.kudu.spark.kudu.DefaultSource, SparkSQL will automatically instantiate the corresponding RelationProvider class, passing a SQLContext and a map of parameters, which it fills in with the table's HDFS URI, and storage properties. The current plan for Kudu + HMS integration (KUDU-2191) is not to set any storage properties, instead attributes like master addresses and table ID will be stored as table properties. As a result, SparkSQL is instantiating a Kudu DefaultSource, but it doesn't pass necessary arguments like the table name or master addresses. Getting this far required adding a dummy org.apache.kudu.hive.KuduStorageHandler class to the classpath so that the Hive client wouldn't choke on the bogus class name. The stacktrace from Spark attempting to instantiate the DefaultSource is provided below.
Spark context Web UI available at http://kudu-hms-1.gce.cloudera.com:4041 Spark context available as 'sc' (master = local[*], app id = local-1532719985143). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.3.1 /_/ Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_181) Type in expressions to have them evaluated. Type :help for more information. scala> sql("DESCRIBE TABLE t1") org.spark_project.guava.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: Kudu table name must be specified in create options using key 'kudu.table'. parameters: Map(), parameters-size: 0, parameters-keys: Set(), path: None at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2263) at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000) at org.spark_project.guava.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCachedPlan(SessionCatalog.scala:137) at org.apache.spark.sql.execution.datasources.FindDataSourceTable.org$apache$spark$sql$execution$datasources$FindDataSourceTable$$readDataSourceTable(DataSourceStrategy.scala:227) at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:264) at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:255) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256) at org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:255) at org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:223) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76) at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:124) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:118) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:103) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74) at org.apache.spark.sql.SparkSession.table(SparkSession.scala:627) at org.apache.spark.sql.execution.command.DescribeTableCommand.run(tables.scala:548) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79) at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190) at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190) at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641) ... 49 elided Caused by: java.lang.IllegalArgumentException: Kudu table name must be specified in create options using key 'kudu.table'. parameters: Map(), parameters-size: 0, parameters-keys: Set(), path: None at org.apache.kudu.spark.kudu.DefaultSource$$anonfun$11.apply(DefaultSource.scala:82) at org.apache.kudu.spark.kudu.DefaultSource$$anonfun$11.apply(DefaultSource.scala:82) at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) at org.apache.spark.sql.catalyst.util.CaseInsensitiveMap.getOrElse(CaseInsensitiveMap.scala:28) at org.apache.kudu.spark.kudu.DefaultSource.createRelation(DefaultSource.scala:81) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:340) at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:242) at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:227) at org.spark_project.guava.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792) at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) ... 96 more scala>
After striking out with the existing interfaces I looked at the DataSourceRegister API which is a part of the DataSourceV2 effort underway in Spark. It's not clear that this API actually provides more context when creating relations (we need table name and master addresses from the table properties and options are still just passed as a map in DataSourceOptions), but more significantly it doesn't appear that the spark.sql.sources.provider property works correctly with DataSourceV2 instances, it gives a class cast issue:
Spark context Web UI available at http://kudu-hms-1.gce.cloudera.com:4041 Spark context available as 'sc' (master = local[*], app id = local-1532720634224). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.3.1 /_/ Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_181) Type in expressions to have them evaluated. Type :help for more information. scala> sql("DESCRIBE TABLE t1") org.apache.spark.sql.AnalysisException: org.apache.kudu.spark.KuduDataSource is not a valid Spark SQL Data Source.; at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:415) at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:242) at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:227) at org.spark_project.guava.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792) at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000) at org.spark_project.guava.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCachedPlan(SessionCatalog.scala:137) at org.apache.spark.sql.execution.datasources.FindDataSourceTable.org$apache$spark$sql$execution$datasources$FindDataSourceTable$$readDataSourceTable(DataSourceStrategy.scala:227) at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:264) at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:255) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256) at org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:255) at org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:223) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76) at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:124) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:118) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:103) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74) at org.apache.spark.sql.SparkSession.table(SparkSession.scala:627) at org.apache.spark.sql.execution.command.DescribeTableCommand.run(tables.scala:548) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79) at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190) at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190) at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641) ... 49 elided scala>
org.apache.kudu.spark.KuduDataSource is a dummy class I put on the classpath and added to the Hive metastore table attribute:
class KuduDataSource extends DataSourceV2 with DataSourceRegister with ReadSupport { override def shortName(): String = "kudu" override def createReader(options: DataSourceOptions): DataSourceReader = { new KuduDataSourceReader(options) } } class KuduDataSourceReader(val options: DataSourceOptions) extends DataSourceReader { override def readSchema(): StructType = ??? override def createDataReaderFactories(): util.List[DataReaderFactory[Row]] = ??? }
Attachments
Issue Links
- relates to
-
KUDU-2490 implement Kudu DataSourceV2 and related classes
- Open