Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-25916

Using upsert-kafka with a flush buffer results in Null Pointer Exception

    XMLWordPrintableJSON

Details

    Description

      Flink Version: 1.14.3

      upsert-kafka version: 1.14.3

       

      I have been trying to buffer output from the upsert-kafka connector using the documented parameters sink.buffer-flush.max-rows and sink.buffer-flush.interval

      Whenever I attempt to run an INSERT query with buffering, I receive the following error (shortened for brevity):

      Caused by: java.lang.NullPointerException
              at org.apache.flink.streaming.connectors.kafka.table.ReducingUpsertWriter.flush(ReducingUpsertWriter.java:145) 
              at org.apache.flink.streaming.connectors.kafka.table.ReducingUpsertWriter.lambda$registerFlush$3(ReducingUpsertWriter.java:124) 
              at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693) 
              at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684) 
              at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) 
              at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
              at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) 
              at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) 
              at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) 
              at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) 
              at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) 
              at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) 
              at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) 
              at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) 
              at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
              at java.lang.Thread.run(Thread.java:829) [?:?] 

       

      If I remove the parameters related to flush buffering, then everything works as expected with no problems at all.  For reference, here is the full setup with source, destination, and queries.  Yes, I realize the INSERT could use an overhaul, but that's not the issue at hand .

      CREATE TABLE `source_topic` (
          `timeGMT` INT,
          `eventtime` AS TO_TIMESTAMP(FROM_UNIXTIME(`timeGMT`)),
          `visIdHigh` BIGINT,
          `visIdLow` BIGINT,
          `visIdStr` AS CONCAT(IF(`visIdHigh` IS NULL, '', CAST(`visIdHigh` AS STRING)), IF(`visIdLow` IS NULL, '', CAST(`visIdLow` AS STRING))),
          WATERMARK FOR eventtime AS eventtime - INTERVAL '25' SECONDS
      ) WITH (
          'connector' = 'kafka',
          'properties.group.id' = 'flink_metrics',
          'properties.bootstrap.servers' = 'brokers.example.com:9093',
          'topic' = 'source_topic',
          'scan.startup.mode' = 'earliest-offset',
          'value.format' = 'avro-confluent',
          'value.avro-confluent.url' = 'http://schema.example.com',
          'value.fields-include' = 'EXCEPT_KEY'
      );
      
      
       CREATE TABLE dest_topic (
          `messageType` VARCHAR,
          `observationID` BIGINT,
          `obsYear` BIGINT,
          `obsMonth` BIGINT,
          `obsDay` BIGINT,
          `obsHour` BIGINT,
          `obsMinute` BIGINT,
          `obsTz` VARCHAR(5),
          `value` BIGINT,
          PRIMARY KEY (observationID, messageType) NOT ENFORCED
      ) WITH (
          'connector' = 'upsert-kafka',
          'key.format' = 'json',
          'properties.bootstrap.servers' = 'brokers.example.com:9092',
          'sink.buffer-flush.max-rows' = '50000',
          'sink.buffer-flush.interval' = '1000',
          'topic' = 'dest_topic ',
          'value.format' = 'json'
      );
      
      INSERT INTO adobenow_metrics
          SELECT `messageType`, `observationID`, obsYear, obsMonth, obsDay, obsHour, obsMinute, obsTz, SUM(`value`) AS `value` FROM (
              SELECT `messageType`, `observationID`, obsYear, obsMonth, obsDay, obsHour, obsMinute, '-0000' AS obsTz, 1 AS `value`, `visIdStr` FROM (
                  SELECT
                      'visit' AS `messageType`,
                      CAST(DATE_FORMAT(window_start, 'yyyyMMddHHmm') AS BIGINT) AS `observationID`,
                      year(window_start) AS obsYear,
                      month(window_start) AS obsMonth,
                      dayofmonth(window_start) AS obsDay,
                      hour(window_start) AS obsHour,
                      minute(window_start) AS obsMinute,
                      '-0000' AS obsTz,
                      visIdStr
                  FROM TABLE(TUMBLE(TABLE `adobenow_sparkweb`, DESCRIPTOR(`eventtime`), INTERVAL '60' SECONDS))
                  WHERE visIdStr IS NOT NULL
                  GROUP BY window_start, window_end, visIdStr
              )
              GROUP BY messageType, observationID, obsYear, obsMonth, obsDay, obsHour, obsMinute, `visIdStr`
          )
          GROUP BY messageType, observationID, obsYear, obsMonth, obsDay, obsHour, obsMinute, obsTz;

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            heaje Corey Shaw
            Votes:
            3 Vote for this issue
            Watchers:
            16 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: