Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22443

AggregatedDialect doesn't override quoteIdentifier and other methods in JdbcDialects

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.2.0
    • Fix Version/s: 2.3.0
    • Component/s: SQL
    • Labels:
      None

      Description

      The AggregatedDialect only implements canHandle, getCatalystType, getJDBCType. It doesn't implement other methods in JdbcDialect.
      So if multiple Dialects are registered with the same driver, the implementation of these methods will not be taken and the default implementation in JdbcDialect will be used.

      Example:

      package example
      
      import java.util.Properties
      
      import org.apache.spark.sql.SparkSession
      import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
      import org.apache.spark.sql.types.{DataType, MetadataBuilder}
      
      object AnotherMySQLDialect extends JdbcDialect {
        override def canHandle(url : String): Boolean = url.startsWith("jdbc:mysql")
      
        override def getCatalystType(
                                      sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
          None
        }
      
        override def quoteIdentifier(colName: String): String = {
          s"`$colName`"
        }
      }
      
      object App {
        def main(args: Array[String]) {
          val spark = SparkSession.builder.master("local").appName("Simple Application").getOrCreate()
          JdbcDialects.registerDialect(AnotherMySQLDialect)
          val jdbcUrl = s"jdbc:mysql://host:port/db?user=user&password=password"
          spark.read.jdbc(jdbcUrl, "badge", new Properties()).show()
        }
      }
      

      will throw an exception.

      17/11/03 17:08:39 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
      java.sql.SQLDataException: Cannot determine value type from string 'id'
      	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:530)
      	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:513)
      	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:505)
      	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:479)
      	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:489)
      	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:89)
      	at com.mysql.cj.jdbc.result.ResultSetImpl.getLong(ResultSetImpl.java:853)
      	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$8.apply(JdbcUtils.scala:409)
      	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$8.apply(JdbcUtils.scala:408)
      	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:330)
      	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:312)
      	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
      	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
      	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
      	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
      	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
      	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
      	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      	at org.apache.spark.scheduler.Task.run(Task.scala:108)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: com.mysql.cj.core.exceptions.DataConversionException: Cannot determine value type from string 'id'
      	at com.mysql.cj.core.io.StringConverter.createFromBytes(StringConverter.java:121)
      	at com.mysql.cj.core.io.MysqlTextValueDecoder.decodeByteArray(MysqlTextValueDecoder.java:232)
      	at com.mysql.cj.mysqla.result.AbstractResultsetRow.decodeAndCreateReturnValue(AbstractResultsetRow.java:124)
      	at com.mysql.cj.mysqla.result.AbstractResultsetRow.getValueFromBytes(AbstractResultsetRow.java:225)
      	at com.mysql.cj.mysqla.result.ByteArrayRow.getValue(ByteArrayRow.java:84)
      	at com.mysql.cj.jdbc.result.ResultSetImpl.getNonStringValueFromRow(ResultSetImpl.java:630)
      	... 24 more
      

      Though the quoteIdentifier is correctly implemented in Spark's MySQLDialect and our AnotherMySQLDialect.

        Attachments

          Activity

            People

            • Assignee:
              huaxing Huaxin Gao
              Reporter:
              liuhb86 Hongbo
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: