Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-15566

Flink implicitly order the fields in PojoTypeInfo

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 1.10.0
    • None
    • Table SQL / API

    Description

      I don't know why flink would do that, but this cause my user defined function behavior incorrectly if I and pojo in my udf and override getResultType

      https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java#L85

       

      Here's the udf I define.

       

      
      %flink
      import org.apache.flink.api.java.typeutils.RowTypeInfo
      import org.apache.flink.api.common.typeinfo.Types
      import org.apache.flink.api.java.typeutils._
      import org.apache.flink.api.scala.typeutils._
      import org.apache.flink.api.scala._
      
      class Person(val age:Int, val job: String, val marital: String, val education: String, val default: String, val balance: String, val housing: String, val loan: String, val contact: String, val day: String, val month: String, val duration: Int, val campaign: Int, val pdays: Int, val previous: Int, val poutcome: String, val y: String)
      
      class ParseFunction extends TableFunction[Person] {
        def eval(line: String) {
          val tokens = line.split(";")
          // parse the line
          if (!line.startsWith("\"age\"")) {
            collect(new Person(new Integer(tokens(0).toInt), normalize(tokens(1)), normalize(tokens(2)), normalize(tokens(3)), normalize(tokens(4)), normalize(tokens(5)), normalize(tokens(6)), normalize(tokens(7)), normalize(tokens(8)), normalize(tokens(9)), normalize(tokens(10)), new Integer(tokens(11).toInt),  new Integer(tokens(12).toInt),  
                 new Integer(tokens(13).toInt), new Integer(tokens(14).toInt),  normalize(tokens(15)), normalize(tokens(16))))
          }
        }
        
        override def getResultType() = {
          val cls = classOf[Person]
          new PojoTypeInfo[Person](classOf[Person], java.util.Arrays.asList(
             new PojoField(cls.getDeclaredField("age"), Types.INT),
             new PojoField(cls.getDeclaredField("job"), Types.STRING),
             new PojoField(cls.getDeclaredField("marital"), Types.STRING),
             new PojoField(cls.getDeclaredField("education"), Types.STRING),
             new PojoField(cls.getDeclaredField("default"), Types.STRING),
             new PojoField(cls.getDeclaredField("balance"), Types.STRING), 
             new PojoField(cls.getDeclaredField("housing"), Types.STRING), 
             new PojoField(cls.getDeclaredField("loan"), Types.STRING), 
             new PojoField(cls.getDeclaredField("contact"), Types.STRING), 
             new PojoField(cls.getDeclaredField("day"), Types.STRING), 
             new PojoField(cls.getDeclaredField("month"), Types.STRING), 
             new PojoField(cls.getDeclaredField("duration"), Types.INT),
             new PojoField(cls.getDeclaredField("campaign"), Types.INT),
             new PojoField(cls.getDeclaredField("pdays"), Types.INT),
             new PojoField(cls.getDeclaredField("previous"), Types.INT),
             new PojoField(cls.getDeclaredField("poutcome"), Types.STRING),
             new PojoField(cls.getDeclaredField("y"), Types.STRING)
           ))
        }  
      
        // remove the quote
        private def normalize(token: String) = {
            if (token.startsWith("\"")) {
                token.substring(1, token.length - 1)
            } else {
                token
            }
        }
      }

      And then I use this udf in sql but get the wrong result because flink reorder the fields implicitly.

      Attachments

        Activity

          People

            Unassigned Unassigned
            zjffdu Jeff Zhang
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: