Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-46934

Read/write roundtrip for struct type with special characters with HMS

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 4.0.0
    • 4.0.0
    • SQL
    • Tested in Spark 3.3.0, 3.3.2.

    Description

      We are trying to create a Hive View using following SQL command "CREATE OR REPLACE VIEW yuting AS SELECT INFO_ANN FROM table_2611810".

      Our table_2611810 has certain columns contain special characters such as "/". Here is the schema of this table.

      contigName              string
      start                   bigint
      end                     bigint
      names                   array<string>
      referenceAllele         string
      alternateAlleles        array<string>
      qual                    double
      filters                 array<string>
      splitFromMultiAllelic    boolean
      INFO_NCAMP              int
      INFO_ODDRATIO           double
      INFO_NM                 double
      INFO_DBSNP_CAF          array<string>
      INFO_SPANPAIR           int
      INFO_TLAMP              int
      INFO_PSTD               double
      INFO_QSTD               double
      INFO_SBF                double
      INFO_AF                 array<double>
      INFO_QUAL               double
      INFO_SHIFT3             int
      INFO_VARBIAS            string
      INFO_HICOV              int
      INFO_PMEAN              double
      INFO_MSI                double
      INFO_VD                 int
      INFO_DP                 int
      INFO_HICNT              int
      INFO_ADJAF              double
      INFO_SVLEN              int
      INFO_RSEQ               string
      INFO_MSigDb             array<string>
      INFO_NMD                array<string>
      INFO_ANN                array<struct<Allele:string,Annotation:array<string>,Annotation_Impact:string,Gene_Name:string,Gene_ID:string,Feature_Type:string,Feature_ID:string,Transcript_BioType:string,Rank:struct<rank:string,total:string>,HGVS_c:string,HGVS_p:string,cDNA_pos/cDNA_length:struct<pos:string,length:string>,CDS_pos/CDS_length:struct<pos:string,length:string>,AA_pos/AA_length:struct<pos:string,length:string>,Distance:int,ERRORS/WARNINGS/INFO:string>>
      INFO_BIAS               string
      INFO_MQ                 double
      INFO_HIAF               double
      INFO_END                int
      INFO_SPLITREAD          int
      INFO_GDAMP              int
      INFO_LSEQ               string
      INFO_LOF                array<string>
      INFO_SAMPLE             string
      INFO_AMPFLAG            int
      INFO_SN                 double
      INFO_SVTYPE             string
      INFO_TYPE               string
      INFO_MSILEN             double
      INFO_DUPRATE            double
      INFO_DBSNP_COMMON       int
      INFO_REFBIAS            string
      genotypes               array<struct<sampleId:string,alleleDepths:array<int>,ALD:array<int>,AF:array<double>,phased:boolean,calls:array<int>,VD:int,depth:int,RD:array<int>>> 

      You can see that column INFO_ANN is an array of struct and it contains column which has "/" inside such as "cDNA_pos/cDNA_length", etc. 

      We believe that it is the root cause that cause the following SparkException:

      scala> val schema = spark.sql("CREATE OR REPLACE VIEW yuting AS SELECT INFO_ANN FROM table_2611810")
      24/01/31 07:50:02.658 [main] WARN  o.a.spark.sql.catalyst.util.package - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
      org.apache.spark.SparkException: Cannot recognize hive type string: array<struct<Allele:string,Annotation:array<string>,Annotation_Impact:string,Gene_Name:string,Gene_ID:string,Feature_Type:string,Feature_ID:string,Transcript_BioType:string,Rank:struct<rank:string,total:string>,HGVS_c:string,HGVS_p:string,cDNA_pos/cDNA_length:struct<pos:string,length:string>,CDS_pos/CDS_length:struct<pos:string,length:string>,AA_pos/AA_length:struct<pos:string,length:string>,Distance:int,ERRORS/WARNINGS/INFO:string>>, column: INFO_ANN
        at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotRecognizeHiveTypeError(QueryExecutionErrors.scala:1455)
        at org.apache.spark.sql.hive.client.HiveClientImpl$.getSparkSQLDataType(HiveClientImpl.scala:1022)
        at org.apache.spark.sql.hive.client.HiveClientImpl$.$anonfun$verifyColumnDataType$1(HiveClientImpl.scala:1037)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)
        at org.apache.spark.sql.hive.client.HiveClientImpl$.org$apache$spark$sql$hive$client$HiveClientImpl$$verifyColumnDataType(HiveClientImpl.scala:1037)
        at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$createTable$1(HiveClientImpl.scala:553)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:294)
        at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:225)
        at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:224)
        at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:274)
        at org.apache.spark.sql.hive.client.HiveClientImpl.createTable(HiveClientImpl.scala:552)
        at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$createTable$1(HiveExternalCatalog.scala:286)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:101)
        at org.apache.spark.sql.hive.HiveExternalCatalog.createTable(HiveExternalCatalog.scala:244)
        at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.createTable(ExternalCatalogWithListener.scala:94)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createTable(SessionCatalog.scala:373)
        at org.apache.spark.sql.execution.command.CreateViewCommand.run(views.scala:166)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
        ... 49 elided
      Caused by: org.apache.spark.sql.catalyst.parser.ParseException:
      Syntax error at or near '/': extra input '/'(line 1, pos 247)== SQL ==
      array<struct<Allele:string,Annotation:array<string>,Annotation_Impact:string,Gene_Name:string,Gene_ID:string,Feature_Type:string,Feature_ID:string,Transcript_BioType:string,Rank:struct<rank:string,total:string>,HGVS_c:string,HGVS_p:string,cDNA_pos/cDNA_length:struct<pos:string,length:string>,CDS_pos/CDS_length:struct<pos:string,length:string>,AA_pos/AA_length:struct<pos:string,length:string>,Distance:int,ERRORS/WARNINGS/INFO:string>>
      -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------^^^  at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:306)
        at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:143)
        at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseDataType(ParseDriver.scala:41)
        at org.apache.spark.sql.hive.client.HiveClientImpl$.getSparkSQLDataType(HiveClientImpl.scala:1019)
        ... 101 more 

      We have checked in the recent code https://github.com/apache/spark/blob/88f121c47778f0755862046d09484a83932cb30b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala#L1045, it seems that if we start from our StructType (INFO_ANN) and generates a Hive column with FieldSchema, we are still blocked in function getSparkSQLDataType in Line 1058.

      scala> val c = schema.fields(33)
      c: org.apache.spark.sql.types.StructField = StructField(INFO_ANN,ArrayType(StructType(StructField(Allele,StringType,true),StructField(Annotation,ArrayType(StringType,true),true),StructField(Annotation_Impact,StringType,true),StructField(Gene_Name,StringType,true),StructField(Gene_ID,StringType,true),StructField(Feature_Type,StringType,true),StructField(Feature_ID,StringType,true),StructField(Transcript_BioType,StringType,true),StructField(Rank,StructType(StructField(rank,StringType,true),StructField(total,StringType,true)),true),StructField(HGVS_c,StringType,true),StructField(HGVS_p,StringType,true),StructField(cDNA_pos/cDNA_length,StructType(StructField(pos,StringType,true),StructField(length,StringType,true)),true),StructField(CDS_pos/CDS_length,StructType(St...
      
      scala> c.dataType.catalogString
      res9: String = array<struct<Allele:string,Annotation:array<string>,Annotation_Impact:string,Gene_Name:string,Gene_ID:string,Feature_Type:string,Feature_ID:string,Transcript_BioType:string,Rank:struct<rank:string,total:string>,HGVS_c:string,HGVS_p:string,cDNA_pos/cDNA_length:struct<pos:string,length:string>,CDS_pos/CDS_length:struct<pos:string,length:string>,AA_pos/AA_length:struct<pos:string,length:string>,Distance:int,ERRORS/WARNINGS/INFO:string>>
      
      scala> val f = new FieldSchema(c.name, c.dataType.catalogString, c.getComment().orNull)
      f: org.apache.hadoop.hive.metastore.api.FieldSchema = FieldSchema(name:INFO_ANN, type:array<struct<Allele:string,Annotation:array<string>,Annotation_Impact:string,Gene_Name:string,Gene_ID:string,Feature_Type:string,Feature_ID:string,Transcript_BioType:string,Rank:struct<rank:string,total:string>,HGVS_c:string,HGVS_p:string,cDNA_pos/cDNA_length:struct<pos:string,length:string>,CDS_pos/CDS_length:struct<pos:string,length:string>,AA_pos/AA_length:struct<pos:string,length:string>,Distance:int,ERRORS/WARNINGS/INFO:string>>, comment:null)
       
      scala> CatalystSqlParser.parseDataType(f.getType)
      org.apache.spark.sql.catalyst.parser.ParseException:
      Syntax error at or near '/': extra input '/'(line 1, pos 247)== SQL ==
      array<struct<Allele:string,Annotation:array<string>,Annotation_Impact:string,Gene_Name:string,Gene_ID:string,Feature_Type:string,Feature_ID:string,Transcript_BioType:string,Rank:struct<rank:string,total:string>,HGVS_c:string,HGVS_p:string,cDNA_pos/cDNA_length:struct<pos:string,length:string>,CDS_pos/CDS_length:struct<pos:string,length:string>,AA_pos/AA_length:struct<pos:string,length:string>,Distance:int,ERRORS/WARNINGS/INFO:string>>
      -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------^^^  at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:306)
        at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:143)
        at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseDataType(ParseDriver.scala:41)
        ... 49 elided

      Possible Solution:

      Are we able to generate c.dataType.catalogString with `` such as 

      scala> c.dataType.catalogString res9: String = array<struct<Allele:string,Annotation:array<string>,Annotation_Impact:string,Gene_Name:string,Gene_ID:string,Feature_Type:string,Feature_ID:string,Transcript_BioType:string,Rank:struct<rank:string,total:string>,HGVS_c:string,HGVS_p:string,`cDNA_pos/cDNA_length`:struct<pos:string,length:string>,`CDS_pos/CDS_length`:struct<pos:string,length:string>,`AA_pos/AA_length`:struct<pos:string,length:string>,Distance:int,ERRORS/WARNINGS/INFO:string>> 

      Then there will not be any SparkException while calling CatalystSqlParser.parseDataType(f.getType)

       

      Thanks in advance for your helps.

      Attachments

        Activity

          People

            yao Kent Yao
            yutinglin Yu-Ting LIN
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: