Details
-
New Feature
-
Status: Resolved
-
Not a Priority
-
Resolution: Fixed
-
1.10.0
Description
//sql
CREATE TABLE sink_kafka ( subtype STRING , svt STRING ) WITH (……)
//sql
CREATE TABLE source_kafka ( subtype STRING , svt STRING ) WITH (……)
//scala udf class ScalaUpper extends ScalarFunction { def eval(str: String) : String= { if(str == null){ return "" }else{ return str } } } btenv.registerFunction("scala_upper", new ScalaUpper())
//sql
insert into sink_kafka select subtype, scala_upper(svt) from source_kafka
Sometimes the svt's value is null, inert into kafkas json like {"subtype":"qin","svt":null}
If the amount of data is small, it is acceptable,but we process 10TB of data every day, and there may be many nulls in the json, which affects the efficiency. If you can add a parameter to remove the null key when defining a sinktable, the performance will be greatly improved