Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-19777

Fix NullPointException for WindowOperator.close()

    XMLWordPrintableJSON

Details

    Description

      i use flink sql run a job,the sql and metadata is :
      meta :

      1>soure: kafka
      create table metric_source_window_table(

      `metricName` String,

      `namespace` String,

      `timestamp` BIGINT,

      `doubleValue` DOUBLE,

      `longValue` BIGINT,

      `metricsValue` String,

      `tags` MAP<String, String>,

      `meta` Map<String, String>,

      t as TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`/1000,'yyyy-MM-dd HH:mm:ss')),

      WATERMARK FOR t AS t) WITH (

      'connector' = 'kafka',

      'topic' = 'ai-platform',

      'properties.bootstrap.servers' = 'xxx',

      'properties.group.id' = 'metricgroup',

      'scan.startup.mode'='earliest-offset',

      'format' = 'json',

      'json.fail-on-missing-field' = 'false',

      'json.ignore-parse-errors' = 'true')

      2>sink to clickhouse(the clickhouse-connector was developed by ourself)

      create table flink_metric_window_table(

      `timestamp` BIGINT,

      `longValue` BIGINT,

      `metricName` String,

      `metricsValueSum` DOUBLE,

      `metricsValueMin` DOUBLE,

      `metricsValueMax` DOUBLE,

      `tag_record_id` String,

      `tag_host_ip` String,

      `tag_instance` String,

      `tag_job_name` String,

      `tag_ai_app_name` String,

      `tag_namespace` String,

      `tag_ai_type` String,

      `tag_host_name` String,

      `tag_alarm_domain` String) WITH (

      'connector.type' = 'clickhouse',

      'connector.property-version' = '1',

      'connector.url' = 'jdbc:clickhouse://xxx:8123/dataeye',

      'connector.cluster'='ck_cluster',

      'connector.write.flush.max-rows'='6000',

      'connector.write.flush.interval'='1000',

      'connector.table' = 'flink_metric_table_all')

      my sql is :

      insert into
      hive.temp_vipflink.flink_metric_window_table
      select
      cast(HOP_ROWTIME(t, INTERVAL '60' SECOND, INTERVAL '15' MINUTE) AS BIGINT) AS `timestamps`,
      sum(COALESCE( `longValue`, 0)) AS longValue,
      metricName,
      sum(IF(IS_DIGIT(metricsValue), cast(metricsValue AS DOUBLE), 0)) AS metricsValueSum,
      min(IF(IS_DIGIT(metricsValue), cast(metricsValue AS DOUBLE), 0)) AS metricsValueMin,
      max(IF(IS_DIGIT(metricsValue), cast(metricsValue AS DOUBLE), 0)) AS metricsValueMax,
      tags ['record_id'],
      tags ['host_ip'],
      tags ['instance'],
      tags ['job_name'],
      tags ['ai_app_name'],
      tags ['namespace'],
      tags ['ai_type'],
      tags ['host_name'],
      tags ['alarm_domain']
      from
      hive.temp_vipflink.metric_source_window_table
      group by
      metricName,
      tags ['record_id'],
      tags ['host_ip'],
      tags ['instance'],
      tags ['job_name'],
      tags ['ai_app_name'],
      tags ['namespace'],
      tags ['ai_type'],
      tags ['host_name'],
      tags ['alarm_domain'],
      HOP(t, INTERVAL '60' SECOND, INTERVAL '15' MINUTE)

       

      when i run this sql for a long hours, it will appear a exception like this:

      [2020-10-22 20:54:52.089] [ERROR] [GroupWindowAggregate(groupBy=[metricName, $f1, $f2, $f3, $f4, $f5, $f6, $f7, $f8, $f9], window=[SlidingGroupWindow('w$, t, 900000, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[metricName, $f1, $f2, $f3, $f4, $f5, $f6, $f7, $f8, $f9, SUM($f11) AS longValue, SUM($f12) AS metricsValueSum, MIN($f12) AS metricsValueMin, MAX($f12) AS metricsValueMax, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) -> Calc(select=[CAST(CAST(w$rowtime)) AS timestamps, longValue, metricName, metricsValueSum, metricsValueMin, metricsValueMax, $f1 AS EXPR$6, $f2 AS EXPR$7, $f3 AS EXPR$8, $f4 AS EXPR$9, $f5 AS EXPR$10, $f6 AS EXPR$11, $f7 AS EXPR$12, $f8 AS EXPR$13, $f9 AS EXPR$14]) -> SinkConversionToTuple2 -> Sink: JdbcUpsertTableSink(timestamp, longValue, metricName, metricsValueSum, metricsValueMin, metricsValueMax, tag_record_id, tag_host_ip, tag_instance, tag_job_name, tag_ai_app_name, tag_namespace, tag_ai_type, tag_host_name, tag_alarm_domain) (23/44)] [org.apache.flink.streaming.runtime.tasks.StreamTask] >>> Error during disposal of stream operator. java.lang.NullPointerException: null at org.apache.flink.table.runtime.operators.window.WindowOperator.dispose(WindowOperator.java:318) ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:729) [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:645) [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549) [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_262]

       

      finally ,this job is error, and this job will be failed

      Attachments

        Issue Links

          Activity

            People

              jark Jark Wu
              frank wang frank wang
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: