Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35560

Add query validator support to flink sql gateway via spi pattern

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • Table SQL / Gateway
    • None

    Description

      Summary

      Hello I'd like to suggest query validator support in flink sql gateway via spi pattern.

      As an sql gateway operator, there is need for query validation to only execute safe queries and drop unsafe queries.
      To address this need, I propose adding a QueryValidator interface in flink sql gateway api package.
      This interface will allow users to implement their own query validation logic, providing benefits to flink sql gateway operators.

      Interface

      Below is a draft for the interface.
      It takes Operation and check whether the query is valid or not.

      package org.apache.flink.table.gateway.api.validator;
      
      import org.apache.flink.annotation.Public;
      import org.apache.flink.table.operations.Operation;
      
      /**
       * Interface for implementing a validator that checks the safety of executing queries.
       */
      @Public
      public interface QueryValidator {     
          boolean validateQuery(Operation op);
      }
      

      Example implementation

      Below is an example implementation that inspects Kafka table options, specifically scan.startup.timestamp-millis, and reject when the value is too small, which can cause high disk I/O load.

      package org.apache.flink.table.gateway.api.validator;
      
      import org.apache.flink.table.gateway.api.validator.QueryValidator;
      import org.apache.flink.table.operations.Operation;
      import org.apache.flink.table.operations.ddl.CreateTableOperation;
      
      public class KafkaTimestampValidator implements QueryValidator {
      
          private static final long ONE_DAY = 24 * 60 * 60 * 1000L; 
      
          @Override
          public boolean validateQuery(Operation op) {
              if (op instanceof CreateTableOperation) {
                  CreateTableOperation createTableOp = (CreateTableOperation) op;
                  String connector = createTableOp.getCatalogTable().getOptions().get("connector");
                  if ("kafka".equals(connector)) {
                      String startupTimestamp = createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis");
                      if (startupTimestamp != null && Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) {
                          return false;                
                      }
                  }
              }
              return true;
          }
      }

      I'd be happy to implement this feature, if we can reach on agreement.
      Thanks.

      Attachments

        Activity

          People

            Unassigned Unassigned
            dongwoo.kim dongwoo.kim
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: