Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-26016

FileSystemLookupFunction does not produce correct results when hive table uses columnar storage

    XMLWordPrintableJSON

Details

    Description

      When I use the parquet hive table as the lookup table, there will be some records that cannot be joined. This can be reproduced by adding unit tests to HiveLookupJoinITCase.

        // create the hive table with columnar storage.
              tableEnv.executeSql(
                      String.format(
                              "create table columnar_table (x string) STORED AS PARQUET "
                                      + "tblproperties ('%s'='5min')",
                              HiveOptions.LOOKUP_JOIN_CACHE_TTL.key()));
      
          @Test
          public void testLookupJoinTableWithColumnarStorage() throws Exception {
              // constructs test data, as the DEFAULT_SIZE of VectorizedColumnBatch is 2048, we should
              // write as least 2048 records to the test table.
              List<Row> testData = new ArrayList<>(4096);
              for (int i = 0; i < 4096; i++) {
                  testData.add(Row.of(String.valueOf(i)));
              }
      
              // constructs test data using values table
              TableEnvironment batchEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT);
              batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
              batchEnv.useCatalog(hiveCatalog.getName());
              String dataId = TestValuesTableFactory.registerData(testData);
              batchEnv.executeSql(
                      String.format(
                              "create table value_source(x string, p as proctime()) with ("
                                      + "'connector' = 'values', 'data-id' = '%s', 'bounded'='true')",
                              dataId));
              batchEnv.executeSql("insert overwrite columnar_table select x from value_source").await();
              TableImpl flinkTable =
                      (TableImpl)
                              tableEnv.sqlQuery(
                                      "select t.x as x1, c.x as x2 from value_source t "
                                              + "left join columnar_table for system_time as of t.p c "
                                              + "on t.x = c.x where c.x is null");
              List<Row> results = CollectionUtil.iteratorToList(flinkTable.execute().collect());
              assertTrue(results.size() == 0);
          }
      

      The problem may be caused by the following code.

      RowData row;
      while ((row = partitionReader.read(reuse)) != null) {
         count++;
         RowData key = extractLookupKey(row);
         List<RowData> rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
         rows.add(serializer.copy(row));
      }
      

      It can be fixed with the following modification

      RowData row;
      while ((row = partitionReader.read(reuse)) != null) {
          count++;
          RowData rowData = serializer.copy(row);
          RowData key = extractLookupKey(rowData);
          List<RowData> rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
          rows.add(rowData);
      }
      

      Attachments

        Activity

          People

            hackergin Feng Jin
            hackergin Feng Jin
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: