Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20181

RowData cannot cast to Tuple2

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Not A Problem
    • None
    • None
    • Table SQL / Planner
    • None

    Description

      I want to emit CDC data by my own StreamOperator.

      flink version :1.11.2, blink planner.

      //代码占位符
          getTableEnv().registerTableSource(
              "source",
              new StreamTableSource<RowData>() {
                TableSchema tableSchema = TableSchema.builder()
                    .field("id", new AtomicDataType(new IntType(false)))
                    .field("name", DataTypes.STRING())
                    .field("type", DataTypes.STRING())
                    .primaryKey("id")
                    .build();          @Override
                public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) {
                  return execEnv.addSource(new DebugSourceFunction(tableSchema.toRowDataType()));
                }          @Override
                public TableSchema getTableSchema() {
                  return tableSchema;
                }          @Override
                public DataType getProducedDataType() {
                  return getTableSchema().toRowDataType().bridgedTo(RowData.class);
                }
              }
          );
          sql("insert into Test.testdb.animal "
                  + " SELECT id, name, type, '2020' as da, '11' as hr"
                  + " from source"
          );  
      
      class DebugSourceFunction extends RichParallelSourceFunction<RowData> implements ResultTypeQueryable<RowData> {           
      DataType dataType;    
          public DebugSourceFunction(DataType dataType) {
            this.dataType = dataType;
          }    
          @Override
          public TypeInformation<RowData> getProducedType() {
            return (TypeInformation<RowData>) createTypeInformation(dataType);
          }    
          @Override
          public void run(SourceContext<RowData> ctx) throws Exception {
            ctx.collect(GenericRowData.ofKind(RowKind.INSERT, 1, StringData.fromString("monkey"), StringData.fromString("small")));
          }    
          @Override
          public void cancel() {    }    public TypeInformation<?> createTypeInformation(DataType producedDataType) {
            final DataType internalDataType = DataTypeUtils.transform(
                producedDataType,
                TypeTransformations.TO_INTERNAL_CLASS);
            return fromDataTypeToTypeInfo(internalDataType);
          }
        }  
      
      public class TestUpsertTableSink implements UpsertStreamTableSink<RowData>, OverwritableTableSink, PartitionableTableSink {
           @Override
          public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, RowData>> dataStream) {
            
            DataStream<Void> returnStream = dataStream
                .map(
                    (MapFunction<Tuple2<Boolean, RowData>, RowData>)
                        value -> value.f1
                )
                ......      
            return returnStream
                .addSink(new DiscardingSink<>())
                .setParallelism(1);
          }
        }
      

      when I execute sql with `insert into ...`, occurs class cast fail exception:

      //代码占位符
      Caused by: java.lang.ClassCastException: org.apache.flink.table.data.GenericRowData cannot be cast to org.apache.flink.api.java.tuple.Tuple2Caused by: java.lang.ClassCastException: org.apache.flink.table.data.GenericRowData cannot be cast to org.apache.flink.api.java.tuple.Tuple2 at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at StreamExecCalc$8.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            yesorno Xianxun Ye
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: