Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-27411

Move lookup table source cache logic to flink-table-runtime module




      The idea was inspired by FLIP https://cwiki.apache.org/confluence/display/FLINK/FLI... renqs and Yuan Zhu have done great work on this. But I suggest to implement it in a slightly different way, that will allow applying optimizations to caching + requires less dependencies to connectors.

      The point is to move logic of caching to a lower level - to the level of flink-table-runtime operators. Architecture of lookup join looks like this (red text - schematic representation of proposed changes):

      LookupConfig is named like this (not CacheConfig) because it can also contains non-cache options for lookup join (for example, 'lookup.max-retries', 'lookup.async'...).

      Changes in connectors - remove their own logic for configs, caching, retrying queries.
      Changes is public "Table SQL / API" - new class LookupConfig, new ConfigOptions for lookup connectors and new method 'getLookupConfig' in LookupTableSource.

      public interface LookupTableSource extends DynamicTableSource {
          /** @return configurations for planning lookup join and executing it in runtime. */
          default LookupConfig getLookupConfig() {
              return null;

      Changes in "Table SQL / Planner" - class CommonPhysicalLookupJoin and his inheritors.
      Changes in "Table SQL / Runtime" - classes LookupJoinCachingRunner, LookupJoinCachingRunnerWithCalc, AsyncLookupJoinCachingRunner, AsyncLookupJoinCachingRunnerWithCalc. Probably we can use 'decorator' pattern here to avoid code duplication and a large number of classes, but in our private version design is like this (maybe not so elegant).

      With such architecture we can apply further optimizations to caching:
      1) Caching after calculations. LookupJoinRunnerWithCalc + AsyncLookupJoinRunnerWithCalc (and proposed LookupJoinCachingRunnerWithCalc + AsyncLookupJoinCachingRunnerWithCalc) uses 'calc' function. Calc function contains calculations on fields of lookup table, and most of the time these calculations are filters and projections.
      For example, if we have join table A with lookup table B using condition ‘JOIN … ON A.id = B.id AND A.age = B.age + 10 WHERE B.salary > 1000’, ‘calc’ function will contain filters 'A.age = B.age + 10 and B.salary > 1000'. 

      If we apply this function before storing records in cache, size of cache will be significantly reduced: filters = avoid storing useless records in cache, projections = reduce records’ size. So the initial max number of records in cache can be increased by the user.

      2) Constant keys optimization. If join condition contains constants, for example, ‘JOIN … ON A.name = B.name AND B.age = 10', we don't need to store '10' in cache. Currently TableFunction's 'eval' method is called with values 'A.name' and 10, so we store '10' in cache for every row, which is pretty useless.

      Notice, that in this change I didn't mention Hive lookup connector, because it stores all data in memory. This logic can be replaced in future by 'ALL' cache strategy, that was mentioned in original FLIP.


        1. LookupJoin(2).png
          58 kB
          Alexander Smirnov

        Issue Links



              Unassigned Unassigned
              smiralex Alexander Smirnov
              0 Vote for this issue
              5 Start watching this issue