Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-7657

SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 1.3.1, 1.3.2, 1.4.0
    • 1.4.0, 1.5.0
    • Table SQL / API
    • None

    Description

      I have a SQL statement using the Tables API that has a timestamp in it. When the execution environment tries to optimize the SQL, it causes an exception (attached below). The result is any SQL query with a timestamp, date, or time literal is unexecutable if any table source is marked with FilterableTableSource.

       
      Exception in thread "main" java.lang.RuntimeException: Error while applying rule PushFilterIntoTableSourceScanRule, args [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1, $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], fields:(data, last_updated))]
      	at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
      	at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
      	at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
      	at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266)
      	at org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298)
      	at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328)
      	at org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135)
      	at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
      	at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
      	at com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27)
      	at com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22)
      	at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
      	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
      	at scala.App$$anonfun$main$1.apply(App.scala:76)
      	at scala.App$$anonfun$main$1.apply(App.scala:76)
      	at scala.collection.immutable.List.foreach(List.scala:381)
      	at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
      	at scala.App$class.main(App.scala:76)
      	at com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22)
      	at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala)
      Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot be cast to java.util.Date
      	at org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107)
      	at org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80)
      	at org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
      	at org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.immutable.List.foreach(List.scala:381)
      	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      	at scala.collection.immutable.List.map(List.scala:285)
      	at org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35)
      	at org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
      	at org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
      	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
      	at org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.pushFilterIntoScan(PushFilterIntoTableSourceScanRule.scala:92)
      	at org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.onMatch(PushFilterIntoTableSourceScanRule.scala:56)
      	at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:211)
      	... 19 more
      

      I've done quite a bit of debugging on this and tracked it down to a problem with the way a Calcite AST is translated into an Expression tree for the predicates. Calcite parses timestamps as Calendar values, and you'll note in RegNodeToExpressionConverter that a Calendar value is being passed as-is to the Literal which does no conversion of the value. The Literal, in turn, expects the value to be a java.sql.Date subclass, which is where the exception arises.

      I've done some informal testing of a bugfix where I convert the calendars to java.sql.Date/java.sql.Time/java.sql.Timestamp in RegNodeToExpressionConverter and had good results. Here is some reproduction code in Scala. I am using Flink version 1.3.2 and running it in local mode (Right-click + Run-as in IntelliJ).

      package kmurra
      
      import java.sql.Date
      import java.util
      
      import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
      import org.apache.flink.api.java
      import org.apache.flink.api.java.DataSet
      import org.apache.flink.api.java.typeutils.RowTypeInfo
      import org.apache.flink.api.scala.ExecutionEnvironment
      import org.apache.flink.table.api.TableEnvironment
      import org.apache.flink.table.api.scala.BatchTableEnvironment
      import org.apache.flink.table.expressions.Expression
      import org.apache.flink.table.sinks.{BatchTableSink, TableSinkBase}
      import org.apache.flink.table.sources.{BatchTableSource, FilterableTableSource, TableSource}
      import org.apache.flink.types.Row
      
      import scala.collection.mutable.ListBuffer
      import scala.collection.JavaConversions._
      
      
      object TestReproductionApp extends App {
        val tables: BatchTableEnvironment = TableEnvironment.getTableEnvironment(ExecutionEnvironment.getExecutionEnvironment)
        val source = new TestTableSource
        val sink = new PrintTableSink()
        tables.registerTableSource("test_table", source)
        tables.sql("SELECT * FROM test_table WHERE last_updated > DATE '2017-05-01'").writeToSink(sink)
      }
      
      class PrintTableSink() extends TableSinkBase[Row] with BatchTableSink[Row] {
        def emitDataSet(dataSet: DataSet[Row]): Unit = dataSet.print()
        def getOutputType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames)
        protected def copy: TableSinkBase[Row] = new PrintTableSink()
      }
      
      class TestTableSource(val isFilterPushedDown: Boolean = false) extends BatchTableSource[Row] with FilterableTableSource[Row] {
        val getReturnType: RowTypeInfo = {
          val typeInfo = Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, SqlTimeTypeInfo.DATE)
          val fieldNames = Array("data", "last_updated")
          new RowTypeInfo(typeInfo, fieldNames)
        }
      
        def applyPredicate(predicates: util.List[Expression]): TableSource[Row] = new TestTableSource(true)
      
        def getDataSet(execEnv: java.ExecutionEnvironment): java.DataSet[Row] = {
          execEnv.fromCollection({
            val data = ListBuffer[Row]()
            data += row("Success!", Date.valueOf("2017-09-01"))
            data += row("Failure!", Date.valueOf("2017-01-01"))
            data
          })
        }
      
        def row(data: String, lastUpdated: Date): Row = {
          val row = new Row(2)
          row.setField(0, data)
          row.setField(1, lastUpdated)
          row
        }
      }
      
      

      Build system is SBT

       
      name := "kmurra-flink-reproduction"
      organization := "kmurra" 
      version := "1.0"
      
      scalaVersion := "2.11.8"
      
      resolvers ++= Seq("Apache Development Snapshot Repository" at "https://repository.apache.org/content/repositories/snapshots/", Resolver.mavenLocal)
      
      val flinkVersion = "1.3.2"
      
      
      libraryDependencies ++= Seq(
        "org.apache.flink"     %% "flink-scala"             % flinkVersion,//     % "provided",
        "org.apache.flink"     %% "flink-table"             % flinkVersion,//     % "provided",
        "org.apache.flink"     %% "flink-avro"              % flinkVersion,//    % "provided",
        "org.apache.flink"     %% "flink-streaming-scala"   % flinkVersion,//    % "provided",
        "org.apache.flink"     %  "flink-jdbc"              % flinkVersion,//     % "provided"
      )
      
      assemblyMergeStrategy in assembly := {
        case PathList("META-INF", xs @ _*) => MergeStrategy.discard
        case x => MergeStrategy.first
      }
      
      // exclude Scala library from assembly
      assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
      

      Attachments

        Issue Links

          Activity

            People

              kmurra Kent Murra
              kmurra Kent Murra
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: