Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21247

flink iceberg table map<string,string> cannot convert to datastream

    XMLWordPrintableJSON

Details

    Description

      Flink Iceberg Table with map<string,string>

       
      we want to read the table like this :
       
      String querySql = "SELECT ftime,extinfo,country,province,operator,apn,gw,src_ip_head,info_str,product_id,app_version,sdk_id,sdk_version,hardware_os,qua,upload_ip,client_ip,upload_apn,event_code,event_result,package_size,consume_time,event_value,event_time,upload_time,boundle_id,uin,platform,os_version,channel,brand,model from bfzt3 ";
      Table table = tEnv.sqlQuery(querySql);

      DataStream<AttaInfo> sinkStream = tEnv.toAppendStream(table, Types.POJO(AttaInfo.class, map));

      sinkStream.map(x->1).returns(Types.INT).keyBy(new NullByteKeySelector<Integer>()).reduce((x,y) ->

      { return x+y; }

      ).print();
       
       
      when read  we find a exception
       
      2021-02-03 15:37:57
      java.lang.ClassCastException: org.apache.iceberg.flink.data.FlinkParquetReaders$ReusableMapData cannot be cast to org.apache.flink.table.data.binary.BinaryMapData
          at org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:107)
          at org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:47)
          at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:166)
          at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:129)
          at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50)
          at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
          at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
          at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
          at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
          at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
          at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
          at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
          at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:92)
          at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
          at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
          at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
       
      we find that iceberg map is  ReusableMapData implements MapData 

       
      this is the exception 

      MapData has two default implements  GenericMapData and BinaryMapData
      from iceberg implement is ReusableMapData
       
      so i think that code should change to like this 

       

      Attachments

        1. image-2021-02-03-15-38-42-340.png
          62 kB
          donglei
        2. image-2021-02-03-15-40-27-055.png
          111 kB
          donglei
        3. image-2021-02-03-15-41-34-426.png
          26 kB
          donglei
        4. image-2021-02-03-15-43-19-919.png
          22 kB
          donglei
        5. image-2021-02-03-15-52-12-493.png
          62 kB
          donglei
        6. image-2021-02-03-15-53-18-244.png
          63 kB
          donglei

        Activity

          People

            icshuo Shuo Cheng
            txdong-sz donglei
            Votes:
            0 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: