Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.14.3
Description
When I use the parquet hive table as the lookup table, there will be some records that cannot be joined. This can be reproduced by adding unit tests to HiveLookupJoinITCase.
// create the hive table with columnar storage. tableEnv.executeSql( String.format( "create table columnar_table (x string) STORED AS PARQUET " + "tblproperties ('%s'='5min')", HiveOptions.LOOKUP_JOIN_CACHE_TTL.key())); @Test public void testLookupJoinTableWithColumnarStorage() throws Exception { // constructs test data, as the DEFAULT_SIZE of VectorizedColumnBatch is 2048, we should // write as least 2048 records to the test table. List<Row> testData = new ArrayList<>(4096); for (int i = 0; i < 4096; i++) { testData.add(Row.of(String.valueOf(i))); } // constructs test data using values table TableEnvironment batchEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT); batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); batchEnv.useCatalog(hiveCatalog.getName()); String dataId = TestValuesTableFactory.registerData(testData); batchEnv.executeSql( String.format( "create table value_source(x string, p as proctime()) with (" + "'connector' = 'values', 'data-id' = '%s', 'bounded'='true')", dataId)); batchEnv.executeSql("insert overwrite columnar_table select x from value_source").await(); TableImpl flinkTable = (TableImpl) tableEnv.sqlQuery( "select t.x as x1, c.x as x2 from value_source t " + "left join columnar_table for system_time as of t.p c " + "on t.x = c.x where c.x is null"); List<Row> results = CollectionUtil.iteratorToList(flinkTable.execute().collect()); assertTrue(results.size() == 0); }
The problem may be caused by the following code.
RowData row; while ((row = partitionReader.read(reuse)) != null) { count++; RowData key = extractLookupKey(row); List<RowData> rows = cache.computeIfAbsent(key, k -> new ArrayList<>()); rows.add(serializer.copy(row)); }
It can be fixed with the following modification
RowData row; while ((row = partitionReader.read(reuse)) != null) { count++; RowData rowData = serializer.copy(row); RowData key = extractLookupKey(rowData); List<RowData> rows = cache.computeIfAbsent(key, k -> new ArrayList<>()); rows.add(rowData); }