Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.13.2
-
None
-
None
Description
code:
Table table = tbEnv.fromValues(DataTypes.ROW( DataTypes.FIELD("innerPojo", DataTypes.ROW(DataTypes.FIELD("c", STRING()))), DataTypes.FIELD("b", STRING()), DataTypes.FIELD("a", INT())), Row.of(Row.of("str-c"), "str-b", 1)); DataStream<Pojo> pojoDataStream = tbEnv.toAppendStream(table, Pojo.class); ----------------------------- public static class Pojo{ public InnerPojo innerPojo; public String b; public int a; public Pojo() { } } public static class InnerPojo { public String c; public InnerPojo() { } }
error:
java.lang.ClassCastException: org.apache.flink.table.types.logical.IntType cannot be cast to org.apache.flink.table.types.logical.RowType java.lang.ClassCastException: org.apache.flink.table.types.logical.IntType cannot be cast to org.apache.flink.table.types.logical.RowType at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$1.apply(TableSinkUtils.scala:163) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$1.apply(TableSinkUtils.scala:155)
The fields of PojoTypeInfo is in the alphabet order, such that in `expandPojoTypeToSchema`, 'pojoType' and 'queryLogicalType' should have own index,but now we use the pojo field index to get 'queryLogicalType', this will casue the field type mismatch. It should be fixed like :
val queryIndex = queryLogicalType.getFieldIndex(name) val nestedLogicalType = queryLogicalType.getFields()(queryIndex).getType.asInstanceOf[RowType]