Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-46934

Read/write roundtrip for struct type with special characters with HMS

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 4.0.0
    • 4.0.0
    • SQL
    • Tested in Spark 3.3.0, 3.3.2.

    Description

      We are trying to create a Hive View using following SQL command "CREATE OR REPLACE VIEW yuting AS SELECT INFO_ANN FROM table_2611810".

      Our table_2611810 has certain columns contain special characters such as "/". Here is the schema of this table.

      contigName              string
      start                   bigint
      end                     bigint
      names                   array<string>
      referenceAllele         string
      alternateAlleles        array<string>
      qual                    double
      filters                 array<string>
      splitFromMultiAllelic    boolean
      INFO_NCAMP              int
      INFO_ODDRATIO           double
      INFO_NM                 double
      INFO_DBSNP_CAF          array<string>
      INFO_SPANPAIR           int
      INFO_TLAMP              int
      INFO_PSTD               double
      INFO_QSTD               double
      INFO_SBF                double
      INFO_AF                 array<double>
      INFO_QUAL               double
      INFO_SHIFT3             int
      INFO_VARBIAS            string
      INFO_HICOV              int
      INFO_PMEAN              double
      INFO_MSI                double
      INFO_VD                 int
      INFO_DP                 int
      INFO_HICNT              int
      INFO_ADJAF              double
      INFO_SVLEN              int
      INFO_RSEQ               string
      INFO_MSigDb             array<string>
      INFO_NMD                array<string>
      INFO_ANN                array<struct<Allele:string,Annotation:array<string>,Annotation_Impact:string,Gene_Name:string,Gene_ID:string,Feature_Type:string,Feature_ID:string,Transcript_BioType:string,Rank:struct<rank:string,total:string>,HGVS_c:string,HGVS_p:string,cDNA_pos/cDNA_length:struct<pos:string,length:string>,CDS_pos/CDS_length:struct<pos:string,length:string>,AA_pos/AA_length:struct<pos:string,length:string>,Distance:int,ERRORS/WARNINGS/INFO:string>>
      INFO_BIAS               string
      INFO_MQ                 double
      INFO_HIAF               double
      INFO_END                int
      INFO_SPLITREAD          int
      INFO_GDAMP              int
      INFO_LSEQ               string
      INFO_LOF                array<string>
      INFO_SAMPLE             string
      INFO_AMPFLAG            int
      INFO_SN                 double
      INFO_SVTYPE             string
      INFO_TYPE               string
      INFO_MSILEN             double
      INFO_DUPRATE            double
      INFO_DBSNP_COMMON       int
      INFO_REFBIAS            string
      genotypes               array<struct<sampleId:string,alleleDepths:array<int>,ALD:array<int>,AF:array<double>,phased:boolean,calls:array<int>,VD:int,depth:int,RD:array<int>>> 

      You can see that column INFO_ANN is an array of struct and it contains column which has "/" inside such as "cDNA_pos/cDNA_length", etc. 

      We believe that it is the root cause that cause the following SparkException:

      scala> val schema = spark.sql("CREATE OR REPLACE VIEW yuting AS SELECT INFO_ANN FROM table_2611810")
      24/01/31 07:50:02.658 [main] WARN  o.a.spark.sql.catalyst.util.package - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
      org.apache.spark.SparkException: Cannot recognize hive type string: array<struct<Allele:string,Annotation:array<string>,Annotation_Impact:string,Gene_Name:string,Gene_ID:string,Feature_Type:string,Feature_ID:string,Transcript_BioType:string,Rank:struct<rank:string,total:string>,HGVS_c:string,HGVS_p:string,cDNA_pos/cDNA_length:struct<pos:string,length:string>,CDS_pos/CDS_length:struct<pos:string,length:string>,AA_pos/AA_length:struct<pos:string,length:string>,Distance:int,ERRORS/WARNINGS/INFO:string>>, column: INFO_ANN
        at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotRecognizeHiveTypeError(QueryExecutionErrors.scala:1455)
        at org.apache.spark.sql.hive.client.HiveClientImpl$.getSparkSQLDataType(HiveClientImpl.scala:1022)
        at org.apache.spark.sql.hive.client.HiveClientImpl$.$anonfun$verifyColumnDataType$1(HiveClientImpl.scala:1037)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)
        at org.apache.spark.sql.hive.client.HiveClientImpl$.org$apache$spark$sql$hive$client$HiveClientImpl$$verifyColumnDataType(HiveClientImpl.scala:1037)
        at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$createTable$1(HiveClientImpl.scala:553)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:294)
        at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:225)
        at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:224)
        at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:274)
        at org.apache.spark.sql.hive.client.HiveClientImpl.createTable(HiveClientImpl.scala:552)
        at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$createTable$1(HiveExternalCatalog.scala:286)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:101)
        at org.apache.spark.sql.hive.HiveExternalCatalog.createTable(HiveExternalCatalog.scala:244)
        at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.createTable(ExternalCatalogWithListener.scala:94)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createTable(SessionCatalog.scala:373)
        at org.apache.spark.sql.execution.command.CreateViewCommand.run(views.scala:166)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
        ... 49 elided
      Caused by: org.apache.spark.sql.catalyst.parser.ParseException:
      Syntax error at or near '/': extra input '/'(line 1, pos 247)== SQL ==
      array<struct<Allele:string,Annotation:array<string>,Annotation_Impact:string,Gene_Name:string,Gene_ID:string,Feature_Type:string,Feature_ID:string,Transcript_BioType:string,Rank:struct<rank:string,total:string>,HGVS_c:string,HGVS_p:string,cDNA_pos/cDNA_length:struct<pos:string,length:string>,CDS_pos/CDS_length:struct<pos:string,length:string>,AA_pos/AA_length:struct<pos:string,length:string>,Distance:int,ERRORS/WARNINGS/INFO:string>>
      -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------^^^  at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:306)
        at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:143)
        at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseDataType(ParseDriver.scala:41)
        at org.apache.spark.sql.hive.client.HiveClientImpl$.getSparkSQLDataType(HiveClientImpl.scala:1019)
        ... 101 more 

      We have checked in the recent code https://github.com/apache/spark/blob/88f121c47778f0755862046d09484a83932cb30b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala#L1045, it seems that if we start from our StructType (INFO_ANN) and generates a Hive column with FieldSchema, we are still blocked in function getSparkSQLDataType in Line 1058.

      scala> val c = schema.fields(33)
      c: org.apache.spark.sql.types.StructField = StructField(INFO_ANN,ArrayType(StructType(StructField(Allele,StringType,true),StructField(Annotation,ArrayType(StringType,true),true),StructField(Annotation_Impact,StringType,true),StructField(Gene_Name,StringType,true),StructField(Gene_ID,StringType,true),StructField(Feature_Type,StringType,true),StructField(Feature_ID,StringType,true),StructField(Transcript_BioType,StringType,true),StructField(Rank,StructType(StructField(rank,StringType,true),StructField(total,StringType,true)),true),StructField(HGVS_c,StringType,true),StructField(HGVS_p,StringType,true),StructField(cDNA_pos/cDNA_length,StructType(StructField(pos,StringType,true),StructField(length,StringType,true)),true),StructField(CDS_pos/CDS_length,StructType(St...
      
      scala> c.dataType.catalogString
      res9: String = array<struct<Allele:string,Annotation:array<string>,Annotation_Impact:string,Gene_Name:string,Gene_ID:string,Feature_Type:string,Feature_ID:string,Transcript_BioType:string,Rank:struct<rank:string,total:string>,HGVS_c:string,HGVS_p:string,cDNA_pos/cDNA_length:struct<pos:string,length:string>,CDS_pos/CDS_length:struct<pos:string,length:string>,AA_pos/AA_length:struct<pos:string,length:string>,Distance:int,ERRORS/WARNINGS/INFO:string>>
      
      scala> val f = new FieldSchema(c.name, c.dataType.catalogString, c.getComment().orNull)
      f: org.apache.hadoop.hive.metastore.api.FieldSchema = FieldSchema(name:INFO_ANN, type:array<struct<Allele:string,Annotation:array<string>,Annotation_Impact:string,Gene_Name:string,Gene_ID:string,Feature_Type:string,Feature_ID:string,Transcript_BioType:string,Rank:struct<rank:string,total:string>,HGVS_c:string,HGVS_p:string,cDNA_pos/cDNA_length:struct<pos:string,length:string>,CDS_pos/CDS_length:struct<pos:string,length:string>,AA_pos/AA_length:struct<pos:string,length:string>,Distance:int,ERRORS/WARNINGS/INFO:string>>, comment:null)
       
      scala> CatalystSqlParser.parseDataType(f.getType)
      org.apache.spark.sql.catalyst.parser.ParseException:
      Syntax error at or near '/': extra input '/'(line 1, pos 247)== SQL ==
      array<struct<Allele:string,Annotation:array<string>,Annotation_Impact:string,Gene_Name:string,Gene_ID:string,Feature_Type:string,Feature_ID:string,Transcript_BioType:string,Rank:struct<rank:string,total:string>,HGVS_c:string,HGVS_p:string,cDNA_pos/cDNA_length:struct<pos:string,length:string>,CDS_pos/CDS_length:struct<pos:string,length:string>,AA_pos/AA_length:struct<pos:string,length:string>,Distance:int,ERRORS/WARNINGS/INFO:string>>
      -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------^^^  at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:306)
        at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:143)
        at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseDataType(ParseDriver.scala:41)
        ... 49 elided

      Possible Solution:

      Are we able to generate c.dataType.catalogString with `` such as 

      scala> c.dataType.catalogString res9: String = array<struct<Allele:string,Annotation:array<string>,Annotation_Impact:string,Gene_Name:string,Gene_ID:string,Feature_Type:string,Feature_ID:string,Transcript_BioType:string,Rank:struct<rank:string,total:string>,HGVS_c:string,HGVS_p:string,`cDNA_pos/cDNA_length`:struct<pos:string,length:string>,`CDS_pos/CDS_length`:struct<pos:string,length:string>,`AA_pos/AA_length`:struct<pos:string,length:string>,Distance:int,ERRORS/WARNINGS/INFO:string>> 

      Then there will not be any SparkException while calling CatalystSqlParser.parseDataType(f.getType)

       

      Thanks in advance for your helps.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            yao Kent Yao
            yutinglin Yu-Ting LIN
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment