Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4469

Add support for user defined table function in Table API & SQL

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • None
    • 1.2.0
    • Table SQL / API
    • None

    Description

      Normal user-defined functions, such as concat(), take in a single input row and output a single output row. In contrast, table-generating functions transform a single input row to multiple output rows. It is very useful in some cases, such as look up in HBase by rowkey and return one or more rows.

      Adding a user defined table function should:

      1. inherit from UDTF class with specific generic type T
      2. define one or more evel function.

      NOTE:
      1. the eval method must be public and non-static.
      2. the generic type T is the row type returned by table function. Because of Java type erasure, we can’t extract T from the Iterable.
      3. use collect(T) to emit table row
      4. eval method can be overload. Blink will choose the best match eval method to call according to parameter types and number.

      public class Word {
        public String word;
        public Integer length;
      }
      
      public class SplitStringUDTF extends UDTF<Word> {
          public Iterable<Word> eval(String str) {
              if (str != null) {
                  for (String s : str.split(",")) {
                      collect(new Word(s, s.length()));
                  }
              }
          }
      }
      
      // in SQL
      tableEnv.registerFunction("split", new SplitStringUDTF())
      tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS t(w,l)")
      
      // in Java Table API
      tableEnv.registerFunction("split", new SplitStringUDTF())
      // rename split table columns to “w” and “l”
      table.crossApply("split(c) as (w, l)")	
           .select("a, b, w, l")
      // without renaming, we will use the origin field names in the POJO/case/...
      table.crossApply("split(c)")
           .select("a, b, word, length")
      
      // in Scala Table API
      val split = new SplitStringUDTF()
      table.crossApply(split('c) as ('w, 'l))
           .select('a, 'b, 'w, 'l)
      // outerApply for outer join to a UDTF
      table.outerApply(split('c))
           .select('a, 'b, 'word, 'length)
      

      See [1] for more information about UDTF design.

      [1] https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit#

      Attachments

        Issue Links

          Activity

            People

              jark Jark Wu
              jark Jark Wu
              Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: