Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-24666

Add job level "table.exec.state-stale.error-handling" option and apply to related stateful stream operators

    XMLWordPrintableJSON

Details

    Description

      In stream processing, records will be deleted when exceed state ttl (if configured), and when the corresponding record's update arrives again,  the operator may not be able to handle it properly, we need a unified error handling mechanism to handle this situation, 
      instead of each stateful operator currently handling its own.

      e.g., currently, there's a lenient option which was not exposed to users on RetractableTopNFunction

      // flag to skip records with non-exist error instead to fail, true by default.
      private final boolean lenient = true

      So there's no chance to raise the exception when the record(s) unexpectedly cleared by state ttl. Commonly this happens because a too shorter ttl at Line 190 or inconstancy between the two internal state(dataState and treeMap) at other place.

      List<RowData> inputs = dataState.get(key);
      if (inputs == null)
      Unknown macro: { // Skip the data if it's state is cleared because of state ttl. if (lenient)  Unknown macro}
       else 
      Unknown macro: {       throw new RuntimeException(STATE_CLEARED_WARN_MSG); }
      }

      We'd better to expose it to users (default value can be true to keep consistent with previous version) and should be unified to cover all the stateful stream operators than specific to RetractableTopNFunction.

      The new option 'table.exec.state-stale.error-handling' has three values:

       

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              lincoln.86xy lincoln lee
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated: