Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-13389

Setting DataStream return type breaks some type conversion between Table and DataStream

    XMLWordPrintableJSON

Details

    Description

      When converting between data stream and table, there are situations where only GenericTypeInfo can be successfully applied, but not directly setting the specific RowTypeInfo.
      For example the following code doesn't work

      		TypeInformation<?>[] types = {
      			BasicTypeInfo.INT_TYPE_INFO,
      			TimeIndicatorTypeInfo.ROWTIME_INDICATOR(),
      			BasicTypeInfo.STRING_TYPE_INFO};
      		String[] names = {"a", "b", "c"};
      		RowTypeInfo typeInfo = new RowTypeInfo(types, names);
      		DataStream<Row> ds = env.fromCollection(data).returns(typeInfo);
      		Table sourceTable = tableEnv.fromDataStream(ds, "a,b,c");
      		tableEnv.registerTable("MyTableRow", sourceTable);
      
      		DataStream<Row> stream = tableEnv.toAppendStream(sourceTable, Row.class)
      			.map(a -> a)
      			// this line breaks the conversion, it sets the typeinfo to RowTypeInfo.
      			// without this line the output type is GenericTypeInfo(Row)
      			.returns(sourceTable.getSchema().toRowType());  
      		stream.addSink(new StreamITCase.StringSink<Row>());
      		env.execute();
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            rongr Rong Rong
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated: