Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-18961

In the case of FlatMap linking map, if map returns null, an exception will be thrown in FlatMap

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Minor
    • Resolution: Won't Do
    • 1.11.0
    • None
    • API / DataSet
    • None
    • Mac OS 10.13.6

      Kubernetes 1.16.8

      Flink 1.11.0

    Description

      I found a DateSet problem.  In the case of FlatMap linking map, if map returns null, an exception will be thrown in FlatMap.I think it's a problem with the operator chain.I will post a screenshot of the corresponding stack call in the attachment.

      text.filter(value -> value.f0.contains("any")).flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {
                    @Override
                    public void flatMap(Tuple2<String, String> value, Collector<String> out) throws Exception {
                        Pattern pattern = Pattern.compile("\".*\"");
                        Matcher matcher = pattern.matcher(value.f0);
                        if(matcher.find()){
                            String match = matcher.group(0);
                            out.collect(match); // here throw Exception
                        }
                    }
              }).map(value -> {
                  try {
                      String jsonS = value.replace("\"\"","\"");
                      jsonS = jsonS.substring(1,jsonS.length()-1);
                      JSONObject json = JSONObject.parseObject(jsonS);
                      String result = json.getJSONObject("body").getJSONObject("message").getString("data");
                      return result; // this is null 
                  }catch (Exception e){
                      return value;
                  }
              }).print();
      
      
      Caused by: java.lang.NullPointerException: The system does not support records that are null. Null values are only supported as fields inside other objects.Caused by: java.lang.NullPointerException: The system does not support records that are null. Null values are only supported as fields inside other objects. at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:76) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at com.lemonbox.Test$1.flatMap(Test.java:42) at com.lemonbox.Test$1.flatMap(Test.java:35) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:58) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:196) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748)
      
      

      Attachments

        1. Lark20200814-173817.png
          237 kB
          Ryan
        2. Lark20200814-173821.png
          174 kB
          Ryan
        3. Lark20200814-173824.png
          101 kB
          Ryan

        Activity

          People

            Unassigned Unassigned
            kssdxw Ryan
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: