Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-18862

Fix LISTAGG throws BinaryRawValueData cannot be cast to StringData exception in runtime

    XMLWordPrintableJSON

Details

    Description

      1. Env:flinksql、 version 1.11.1,perjob mode
      2. Error:org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to org.apache.flink.table.data.StringData

      3、Job:

      (1) create a kafka table

          CREATE TABLE kafka(
              x String,
              y String
          )with(
             'connector' = 'kafka',
              ......
          )
      

      (2)create a view:

         CREATE VIEW view1 AS
         SELECT 
             x, 
             y, 
             CAST(COUNT(1) AS VARCHAR) AS ct
         FROM kafka
         GROUP BY 
             x, y
      

      (3) aggregate on the view:

          select 
               x, 
               LISTAGG(CONCAT_WS('=', y, ct), ',') AS lists
          FROM view1
          GROUP BY x
      

      And then the exception is thrown:org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to org.apache.flink.table.data.StringData
      The problem is that, there is no RawValueData in the query. The result type of count(1) should be bigint, not RawValueData.

      (4) If there is no aggregation, the job can run succefully.

          select 
      	    x, 
      	    CONCAT_WS('=', y, ct)
          from view1
      

      The detailed exception:

      java.lang.ClassCastException: org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to org.apache.flink.table.data.StringData
      	at org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.table.data.JoinedRowData.getString(JoinedRowData.java:139) ~[flink-table-blink_2.11-1.11.1.jar:?]
      	at org.apache.flink.table.data.RowData.get(RowData.java:273) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
      	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
      	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:205) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
      	at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
      	at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [ad_features_auto-1.0-SNAPSHOT.jar:?]
      

      Attachments

        Issue Links

          Activity

            People

              jark Jark Wu
              YUJIANBO YUJIANBO
              Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: