Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.15.1
-
None
-
None
-
This issue was discovered on MacOS Big Sur.
Description
RowData produced by LineBytesInputFormat is reused, but DeserializationSchemaAdapter#Reader only shallow copies produced data, thus result will always be the last row value.
Given this program:
package mvillalobos.bug; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.table.api.Expressions.$; public class IsThisABatchSQLBug { public static void main(String[] args) { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.executeSql("CREATE TABLE historical_raw_source_template(\n" + " `file.path` STRING NOT NULL METADATA,\n" + " `file.name` STRING NOT NULL METADATA,\n" + " `file.size` BIGINT NOT NULL METADATA,\n" + " `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL METADATA,\n" + " line STRING\n" + " ) WITH (\n" + " 'connector' = 'filesystem', \n" + " 'format' = 'raw'\n" + " );"); tableEnv.executeSql("CREATE TABLE historical_raw_source\n" + " WITH (\n" + " 'path' = '/Users/minmay/dev/mvillalobos/historical/data'\n" + " ) LIKE historical_raw_source_template;"); final TableResult output = tableEnv.from("historical_raw_source").select($("line")).execute(); output.print(); } }
and this sample.csv file in the '/Users/minmay/dev/mvillalobos/historical/data' directory:
one two three four five six seven eight nine ten
The print results are:
+----+--------------------------------+ | +I | ten | | +I | ten | | +I | ten | | +I | ten | | +I | ten | | +I | ten | | +I | ten | | +I | ten | | +I | ten | | +I | ten | +----+--------------------------------+ 10 rows in set