Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
4.0.0
-
Tested in Spark 3.3.0, 3.3.2.
Description
We are trying to create a Hive View using following SQL command "CREATE OR REPLACE VIEW yuting AS SELECT INFO_ANN FROM table_2611810".
Our table_2611810 has certain columns contain special characters such as "/". Here is the schema of this table.
contigName string start bigint end bigint names array<string> referenceAllele string alternateAlleles array<string> qual double filters array<string> splitFromMultiAllelic boolean INFO_NCAMP int INFO_ODDRATIO double INFO_NM double INFO_DBSNP_CAF array<string> INFO_SPANPAIR int INFO_TLAMP int INFO_PSTD double INFO_QSTD double INFO_SBF double INFO_AF array<double> INFO_QUAL double INFO_SHIFT3 int INFO_VARBIAS string INFO_HICOV int INFO_PMEAN double INFO_MSI double INFO_VD int INFO_DP int INFO_HICNT int INFO_ADJAF double INFO_SVLEN int INFO_RSEQ string INFO_MSigDb array<string> INFO_NMD array<string> INFO_ANN array<struct<Allele:string,Annotation:array<string>,Annotation_Impact:string,Gene_Name:string,Gene_ID:string,Feature_Type:string,Feature_ID:string,Transcript_BioType:string,Rank:struct<rank:string,total:string>,HGVS_c:string,HGVS_p:string,cDNA_pos/cDNA_length:struct<pos:string,length:string>,CDS_pos/CDS_length:struct<pos:string,length:string>,AA_pos/AA_length:struct<pos:string,length:string>,Distance:int,ERRORS/WARNINGS/INFO:string>> INFO_BIAS string INFO_MQ double INFO_HIAF double INFO_END int INFO_SPLITREAD int INFO_GDAMP int INFO_LSEQ string INFO_LOF array<string> INFO_SAMPLE string INFO_AMPFLAG int INFO_SN double INFO_SVTYPE string INFO_TYPE string INFO_MSILEN double INFO_DUPRATE double INFO_DBSNP_COMMON int INFO_REFBIAS string genotypes array<struct<sampleId:string,alleleDepths:array<int>,ALD:array<int>,AF:array<double>,phased:boolean,calls:array<int>,VD:int,depth:int,RD:array<int>>>
You can see that column INFO_ANN is an array of struct and it contains column which has "/" inside such as "cDNA_pos/cDNA_length", etc.
We believe that it is the root cause that cause the following SparkException:
scala> val schema = spark.sql("CREATE OR REPLACE VIEW yuting AS SELECT INFO_ANN FROM table_2611810") 24/01/31 07:50:02.658 [main] WARN o.a.spark.sql.catalyst.util.package - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'. org.apache.spark.SparkException: Cannot recognize hive type string: array<struct<Allele:string,Annotation:array<string>,Annotation_Impact:string,Gene_Name:string,Gene_ID:string,Feature_Type:string,Feature_ID:string,Transcript_BioType:string,Rank:struct<rank:string,total:string>,HGVS_c:string,HGVS_p:string,cDNA_pos/cDNA_length:struct<pos:string,length:string>,CDS_pos/CDS_length:struct<pos:string,length:string>,AA_pos/AA_length:struct<pos:string,length:string>,Distance:int,ERRORS/WARNINGS/INFO:string>>, column: INFO_ANN at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotRecognizeHiveTypeError(QueryExecutionErrors.scala:1455) at org.apache.spark.sql.hive.client.HiveClientImpl$.getSparkSQLDataType(HiveClientImpl.scala:1022) at org.apache.spark.sql.hive.client.HiveClientImpl$.$anonfun$verifyColumnDataType$1(HiveClientImpl.scala:1037) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102) at org.apache.spark.sql.hive.client.HiveClientImpl$.org$apache$spark$sql$hive$client$HiveClientImpl$$verifyColumnDataType(HiveClientImpl.scala:1037) at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$createTable$1(HiveClientImpl.scala:553) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:294) at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:225) at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:224) at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:274) at org.apache.spark.sql.hive.client.HiveClientImpl.createTable(HiveClientImpl.scala:552) at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$createTable$1(HiveExternalCatalog.scala:286) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:101) at org.apache.spark.sql.hive.HiveExternalCatalog.createTable(HiveExternalCatalog.scala:244) at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.createTable(ExternalCatalogWithListener.scala:94) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createTable(SessionCatalog.scala:373) at org.apache.spark.sql.execution.command.CreateViewCommand.run(views.scala:166) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617) ... 49 elided Caused by: org.apache.spark.sql.catalyst.parser.ParseException: Syntax error at or near '/': extra input '/'(line 1, pos 247)== SQL == array<struct<Allele:string,Annotation:array<string>,Annotation_Impact:string,Gene_Name:string,Gene_ID:string,Feature_Type:string,Feature_ID:string,Transcript_BioType:string,Rank:struct<rank:string,total:string>,HGVS_c:string,HGVS_p:string,cDNA_pos/cDNA_length:struct<pos:string,length:string>,CDS_pos/CDS_length:struct<pos:string,length:string>,AA_pos/AA_length:struct<pos:string,length:string>,Distance:int,ERRORS/WARNINGS/INFO:string>> -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:306) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:143) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseDataType(ParseDriver.scala:41) at org.apache.spark.sql.hive.client.HiveClientImpl$.getSparkSQLDataType(HiveClientImpl.scala:1019) ... 101 more
We have checked in the recent code https://github.com/apache/spark/blob/88f121c47778f0755862046d09484a83932cb30b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala#L1045, it seems that if we start from our StructType (INFO_ANN) and generates a Hive column with FieldSchema, we are still blocked in function getSparkSQLDataType in Line 1058.
scala> val c = schema.fields(33) c: org.apache.spark.sql.types.StructField = StructField(INFO_ANN,ArrayType(StructType(StructField(Allele,StringType,true),StructField(Annotation,ArrayType(StringType,true),true),StructField(Annotation_Impact,StringType,true),StructField(Gene_Name,StringType,true),StructField(Gene_ID,StringType,true),StructField(Feature_Type,StringType,true),StructField(Feature_ID,StringType,true),StructField(Transcript_BioType,StringType,true),StructField(Rank,StructType(StructField(rank,StringType,true),StructField(total,StringType,true)),true),StructField(HGVS_c,StringType,true),StructField(HGVS_p,StringType,true),StructField(cDNA_pos/cDNA_length,StructType(StructField(pos,StringType,true),StructField(length,StringType,true)),true),StructField(CDS_pos/CDS_length,StructType(St... scala> c.dataType.catalogString res9: String = array<struct<Allele:string,Annotation:array<string>,Annotation_Impact:string,Gene_Name:string,Gene_ID:string,Feature_Type:string,Feature_ID:string,Transcript_BioType:string,Rank:struct<rank:string,total:string>,HGVS_c:string,HGVS_p:string,cDNA_pos/cDNA_length:struct<pos:string,length:string>,CDS_pos/CDS_length:struct<pos:string,length:string>,AA_pos/AA_length:struct<pos:string,length:string>,Distance:int,ERRORS/WARNINGS/INFO:string>> scala> val f = new FieldSchema(c.name, c.dataType.catalogString, c.getComment().orNull) f: org.apache.hadoop.hive.metastore.api.FieldSchema = FieldSchema(name:INFO_ANN, type:array<struct<Allele:string,Annotation:array<string>,Annotation_Impact:string,Gene_Name:string,Gene_ID:string,Feature_Type:string,Feature_ID:string,Transcript_BioType:string,Rank:struct<rank:string,total:string>,HGVS_c:string,HGVS_p:string,cDNA_pos/cDNA_length:struct<pos:string,length:string>,CDS_pos/CDS_length:struct<pos:string,length:string>,AA_pos/AA_length:struct<pos:string,length:string>,Distance:int,ERRORS/WARNINGS/INFO:string>>, comment:null) scala> CatalystSqlParser.parseDataType(f.getType) org.apache.spark.sql.catalyst.parser.ParseException: Syntax error at or near '/': extra input '/'(line 1, pos 247)== SQL == array<struct<Allele:string,Annotation:array<string>,Annotation_Impact:string,Gene_Name:string,Gene_ID:string,Feature_Type:string,Feature_ID:string,Transcript_BioType:string,Rank:struct<rank:string,total:string>,HGVS_c:string,HGVS_p:string,cDNA_pos/cDNA_length:struct<pos:string,length:string>,CDS_pos/CDS_length:struct<pos:string,length:string>,AA_pos/AA_length:struct<pos:string,length:string>,Distance:int,ERRORS/WARNINGS/INFO:string>> -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:306) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:143) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseDataType(ParseDriver.scala:41) ... 49 elided
Possible Solution:
Are we able to generate c.dataType.catalogString with `` such as
scala> c.dataType.catalogString res9: String = array<struct<Allele:string,Annotation:array<string>,Annotation_Impact:string,Gene_Name:string,Gene_ID:string,Feature_Type:string,Feature_ID:string,Transcript_BioType:string,Rank:struct<rank:string,total:string>,HGVS_c:string,HGVS_p:string,`cDNA_pos/cDNA_length`:struct<pos:string,length:string>,`CDS_pos/CDS_length`:struct<pos:string,length:string>,`AA_pos/AA_length`:struct<pos:string,length:string>,Distance:int,ERRORS/WARNINGS/INFO:string>>
Then there will not be any SparkException while calling CatalystSqlParser.parseDataType(f.getType)
Thanks in advance for your helps.