Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-16627

Support only generate non-null values when serializing into JSON

    XMLWordPrintableJSON

Details

    Description

      //sql
      CREATE TABLE sink_kafka ( subtype STRING , svt STRING ) WITH (……)
      

       

      //sql
      CREATE TABLE source_kafka ( subtype STRING , svt STRING ) WITH (……)
      

       

      //scala udf
      class ScalaUpper extends ScalarFunction {    
      def eval(str: String) : String= { 
             if(str == null){
                 return ""
             }else{
                 return str
             }
          }
          
      }
      btenv.registerFunction("scala_upper", new ScalaUpper())
      

       

      //sql
      insert into sink_kafka select subtype, scala_upper(svt)  from source_kafka
      

       

       


      Sometimes the svt's value is null, inert into kafkas json like  {"subtype":"qin","svt":null}

      If the amount of data is small, it is acceptable,but we process 10TB of data every day, and there may be many nulls in the json, which affects the efficiency. If you can add a parameter to remove the null key when defining a sinktable, the performance will be greatly improved

       

       

       

       

      Attachments

        Activity

          People

            nilerzhou yisha zhou
            jackray jackray wang
            Votes:
            0 Vote for this issue
            Watchers:
            9 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: