Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-37259

JDBC read is always going to wrap the query in a select statement

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.1.2
    • 3.4.0
    • SQL
    • None
    • Patch, Important

    Description

      The read jdbc is wrapping the query it sends to the database server inside a select statement and there is no way to override this currently.

      Initially I ran into this issue when trying to run a CTE query against SQL server and it fails, the details of the failure is in these cases:

      https://github.com/microsoft/mssql-jdbc/issues/1340

      https://github.com/microsoft/mssql-jdbc/issues/1657

      https://github.com/microsoft/sql-spark-connector/issues/147

      https://issues.apache.org/jira/browse/SPARK-32825

      https://issues.apache.org/jira/browse/SPARK-34928

      I started to patch the code to get the query to run and ran into a few different items, if there is a way to add these features to allow this code path to run, this would be extremely helpful to running these type of edge case queries.  These are basic examples here the actual queries are much more complex and would require significant time to rewrite.

      Inside JDBCOptions.scala the query is being set to either, using the dbtable this allows the query to be passed without modification

       

      name.trim
      or
      s"(${subquery}) SPARK_GEN_SUBQ_${curId.getAndIncrement()}"
      

       

      Inside JDBCRelation.scala this is going to try to get the schema for this query, and this ends up running dialect.getSchemaQuery which is doing:

      s"SELECT * FROM $table WHERE 1=0"

      Overriding the dialect here and initially just passing back the $table gets passed here and to the next issue which is in the compute function in JDBCRDD.scala

       

      val sqlText = s"SELECT $columnList FROM ${options.tableOrQuery} $myTableSampleClause" + s" $myWhereClause $getGroupByClause $myLimitClause"
       
      

       

      For these two queries, about a CTE query and using temp tables, finding out the schema is difficult without actually running the query and for the temp table if you run it in the schema check that will have the table now exist and fail when it runs the actual query.

       

      The way I patched these is by doing these two items:

      JDBCRDD.scala (compute)

       

          val runQueryAsIs = options.parameters.getOrElse("runQueryAsIs", "false").toBoolean
          val sqlText = if (runQueryAsIs) {
            s"${options.tableOrQuery}"
          } else {
            s"SELECT $columnList FROM ${options.tableOrQuery} $myWhereClause"
          }
      
      

      JDBCRelation.scala (getSchema)

      val useCustomSchema = jdbcOptions.parameters.getOrElse("useCustomSchema", "false").toBoolean
          if (useCustomSchema) {
            val myCustomSchema = jdbcOptions.parameters.getOrElse("customSchema", "").toString
            val newSchema = CatalystSqlParser.parseTableSchema(myCustomSchema)
            logInfo(s"Going to return the new $newSchema because useCustomSchema is $useCustomSchema and passed in $myCustomSchema")
            newSchema
          } else {
            val tableSchema = JDBCRDD.resolveTable(jdbcOptions)
            jdbcOptions.customSchema match {
            case Some(customSchema) => JdbcUtils.getCustomSchema(
              tableSchema, customSchema, resolver)
            case None => tableSchema
            }
          }

       

      This is allowing the query to run as is, by using the dbtable option and then provide a custom schema that will bypass the dialect schema check

       

      Test queries

       

      query1 = """ 
      SELECT 1 as DummyCOL
      """
      query2 = """ 
      WITH DummyCTE AS
      (
      SELECT 1 as DummyCOL
      )
      SELECT *
      FROM DummyCTE
      """
      query3 = """
      (SELECT *
      INTO #Temp1a
      FROM
      (SELECT @@VERSION as version) data
      )
      (SELECT *
      FROM
      #Temp1a)
      """
      

       

      Test schema

       

      schema1 = """
      DummyXCOL INT
      """
      schema2 = """
      DummyXCOL STRING
      """
      

       

      Test code

       

      jdbcDFWorking = (
          spark.read.format("jdbc")
          .option("url", f"jdbc:sqlserver://{server}:{port};databaseName={database};")
          .option("user", user)
          .option("password", password)
          .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
          .option("dbtable", queryx)
          .option("customSchema", schemax)
          .option("useCustomSchema", "true")
          .option("runQueryAsIs", "true")
          .load()
      )
       
      

       

      Currently we ran into this on these two special SQL server queries however we aren't sure if there is other DB's we are using that we haven't hit this type of issue yet, without going through this I didn't realize the query is always wrapped in the SELECT no matter what you do.

      This is on the Spark 3.1.2 and using the PySpark with the Python 3.7.11

      Thank you for your consideration and assistance to a way to fix this

      Kevin

       

       

       

      Attachments

        Activity

          People

            petertoth Peter Toth
            KevinAppelBofa Kevin Appel
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: