Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-18862

Fix LISTAGG throws BinaryRawValueData cannot be cast to StringData exception in runtime

    XMLWordPrintableJSON

    Details

      Description

      1. Env:flinksql、 version 1.11.1,perjob mode
      2. Error:org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to org.apache.flink.table.data.StringData

      3、Job:

      (1) create a kafka table

          CREATE TABLE kafka(
              x String,
              y String
          )with(
             'connector' = 'kafka',
              ......
          )
      

      (2)create a view:

         CREATE VIEW view1 AS
         SELECT 
             x, 
             y, 
             CAST(COUNT(1) AS VARCHAR) AS ct
         FROM kafka
         GROUP BY 
             x, y
      

      (3) aggregate on the view:

          select 
               x, 
               LISTAGG(CONCAT_WS('=', y, ct), ',') AS lists
          FROM view1
          GROUP BY x
      

      And then the exception is thrown:org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to org.apache.flink.table.data.StringData
      The problem is that, there is no RawValueData in the query. The result type of count(1) should be bigint, not RawValueData.

      (4) If there is no aggregation, the job can run succefully.

          select 
      	    x, 
      	    CONCAT_WS('=', y, ct)
          from view1
      

      The detailed exception:

      java.lang.ClassCastException: org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to org.apache.flink.table.data.StringData
      	at org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.table.data.JoinedRowData.getString(JoinedRowData.java:139) ~[flink-table-blink_2.11-1.11.1.jar:?]
      	at org.apache.flink.table.data.RowData.get(RowData.java:273) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
      	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
      	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:205) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
      	at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
      	at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) ~[ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [ad_features_auto-1.0-SNAPSHOT.jar:?]
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [ad_features_auto-1.0-SNAPSHOT.jar:?]
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                jark Jark Wu
                Reporter:
                YUJIANBO YUJIANBO
              • Votes:
                0 Vote for this issue
                Watchers:
                8 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: