Description
The read jdbc is wrapping the query it sends to the database server inside a select statement and there is no way to override this currently.
Initially I ran into this issue when trying to run a CTE query against SQL server and it fails, the details of the failure is in these cases:
https://github.com/microsoft/mssql-jdbc/issues/1340
https://github.com/microsoft/mssql-jdbc/issues/1657
https://github.com/microsoft/sql-spark-connector/issues/147
https://issues.apache.org/jira/browse/SPARK-32825
https://issues.apache.org/jira/browse/SPARK-34928
I started to patch the code to get the query to run and ran into a few different items, if there is a way to add these features to allow this code path to run, this would be extremely helpful to running these type of edge case queries. These are basic examples here the actual queries are much more complex and would require significant time to rewrite.
Inside JDBCOptions.scala the query is being set to either, using the dbtable this allows the query to be passed without modification
name.trim
or
s"(${subquery}) SPARK_GEN_SUBQ_${curId.getAndIncrement()}"
Inside JDBCRelation.scala this is going to try to get the schema for this query, and this ends up running dialect.getSchemaQuery which is doing:
s"SELECT * FROM $table WHERE 1=0"
Overriding the dialect here and initially just passing back the $table gets passed here and to the next issue which is in the compute function in JDBCRDD.scala
val sqlText = s"SELECT $columnList FROM ${options.tableOrQuery} $myTableSampleClause" + s" $myWhereClause $getGroupByClause $myLimitClause"
For these two queries, about a CTE query and using temp tables, finding out the schema is difficult without actually running the query and for the temp table if you run it in the schema check that will have the table now exist and fail when it runs the actual query.
The way I patched these is by doing these two items:
JDBCRDD.scala (compute)
val runQueryAsIs = options.parameters.getOrElse("runQueryAsIs", "false").toBoolean val sqlText = if (runQueryAsIs) { s"${options.tableOrQuery}" } else { s"SELECT $columnList FROM ${options.tableOrQuery} $myWhereClause" }
JDBCRelation.scala (getSchema)
val useCustomSchema = jdbcOptions.parameters.getOrElse("useCustomSchema", "false").toBoolean if (useCustomSchema) { val myCustomSchema = jdbcOptions.parameters.getOrElse("customSchema", "").toString val newSchema = CatalystSqlParser.parseTableSchema(myCustomSchema) logInfo(s"Going to return the new $newSchema because useCustomSchema is $useCustomSchema and passed in $myCustomSchema") newSchema } else { val tableSchema = JDBCRDD.resolveTable(jdbcOptions) jdbcOptions.customSchema match { case Some(customSchema) => JdbcUtils.getCustomSchema( tableSchema, customSchema, resolver) case None => tableSchema } }
This is allowing the query to run as is, by using the dbtable option and then provide a custom schema that will bypass the dialect schema check
Test queries
query1 = """ SELECT 1 as DummyCOL """ query2 = """ WITH DummyCTE AS ( SELECT 1 as DummyCOL ) SELECT * FROM DummyCTE """ query3 = """ (SELECT * INTO #Temp1a FROM (SELECT @@VERSION as version) data ) (SELECT * FROM #Temp1a) """
Test schema
schema1 = """ DummyXCOL INT """ schema2 = """ DummyXCOL STRING """
Test code
jdbcDFWorking = ( spark.read.format("jdbc") .option("url", f"jdbc:sqlserver://{server}:{port};databaseName={database};") .option("user", user) .option("password", password) .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") .option("dbtable", queryx) .option("customSchema", schemax) .option("useCustomSchema", "true") .option("runQueryAsIs", "true") .load() )
Currently we ran into this on these two special SQL server queries however we aren't sure if there is other DB's we are using that we haven't hit this type of issue yet, without going through this I didn't realize the query is always wrapped in the SELECT no matter what you do.
This is on the Spark 3.1.2 and using the PySpark with the Python 3.7.11
Thank you for your consideration and assistance to a way to fix this
Kevin