Details
-
Bug
-
Status: Closed
-
Critical
-
Resolution: Fixed
-
1.14.3, 1.15.0
-
CentOS 7.9 x64
Intel Xeon Gold 6140 CPU
Description
Flink Version: 1.14.3
upsert-kafka version: 1.14.3
I have been trying to buffer output from the upsert-kafka connector using the documented parameters sink.buffer-flush.max-rows and sink.buffer-flush.interval
Whenever I attempt to run an INSERT query with buffering, I receive the following error (shortened for brevity):
Caused by: java.lang.NullPointerException at org.apache.flink.streaming.connectors.kafka.table.ReducingUpsertWriter.flush(ReducingUpsertWriter.java:145) at org.apache.flink.streaming.connectors.kafka.table.ReducingUpsertWriter.lambda$registerFlush$3(ReducingUpsertWriter.java:124) at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.lang.Thread.run(Thread.java:829) [?:?]
If I remove the parameters related to flush buffering, then everything works as expected with no problems at all. For reference, here is the full setup with source, destination, and queries. Yes, I realize the INSERT could use an overhaul, but that's not the issue at hand .
CREATE TABLE `source_topic` ( `timeGMT` INT, `eventtime` AS TO_TIMESTAMP(FROM_UNIXTIME(`timeGMT`)), `visIdHigh` BIGINT, `visIdLow` BIGINT, `visIdStr` AS CONCAT(IF(`visIdHigh` IS NULL, '', CAST(`visIdHigh` AS STRING)), IF(`visIdLow` IS NULL, '', CAST(`visIdLow` AS STRING))), WATERMARK FOR eventtime AS eventtime - INTERVAL '25' SECONDS ) WITH ( 'connector' = 'kafka', 'properties.group.id' = 'flink_metrics', 'properties.bootstrap.servers' = 'brokers.example.com:9093', 'topic' = 'source_topic', 'scan.startup.mode' = 'earliest-offset', 'value.format' = 'avro-confluent', 'value.avro-confluent.url' = 'http://schema.example.com', 'value.fields-include' = 'EXCEPT_KEY' ); CREATE TABLE dest_topic ( `messageType` VARCHAR, `observationID` BIGINT, `obsYear` BIGINT, `obsMonth` BIGINT, `obsDay` BIGINT, `obsHour` BIGINT, `obsMinute` BIGINT, `obsTz` VARCHAR(5), `value` BIGINT, PRIMARY KEY (observationID, messageType) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'key.format' = 'json', 'properties.bootstrap.servers' = 'brokers.example.com:9092', 'sink.buffer-flush.max-rows' = '50000', 'sink.buffer-flush.interval' = '1000', 'topic' = 'dest_topic ', 'value.format' = 'json' ); INSERT INTO adobenow_metrics SELECT `messageType`, `observationID`, obsYear, obsMonth, obsDay, obsHour, obsMinute, obsTz, SUM(`value`) AS `value` FROM ( SELECT `messageType`, `observationID`, obsYear, obsMonth, obsDay, obsHour, obsMinute, '-0000' AS obsTz, 1 AS `value`, `visIdStr` FROM ( SELECT 'visit' AS `messageType`, CAST(DATE_FORMAT(window_start, 'yyyyMMddHHmm') AS BIGINT) AS `observationID`, year(window_start) AS obsYear, month(window_start) AS obsMonth, dayofmonth(window_start) AS obsDay, hour(window_start) AS obsHour, minute(window_start) AS obsMinute, '-0000' AS obsTz, visIdStr FROM TABLE(TUMBLE(TABLE `adobenow_sparkweb`, DESCRIPTOR(`eventtime`), INTERVAL '60' SECONDS)) WHERE visIdStr IS NOT NULL GROUP BY window_start, window_end, visIdStr ) GROUP BY messageType, observationID, obsYear, obsMonth, obsDay, obsHour, obsMinute, `visIdStr` ) GROUP BY messageType, observationID, obsYear, obsMonth, obsDay, obsHour, obsMinute, obsTz;