Uploaded image for project: 'Apache Airflow'
  1. Apache Airflow
  2. AIRFLOW-4965

Handling throttling in GCP AI operators



    • Type: Improvement
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 1.10.3
    • Fix Version/s: 1.10.6
    • Component/s: gcp
    • Labels:


      Polidea develops Apache Airflow operators for following Google Cloud AI services:

      • Cloud Translate
      • Cloud Vision
      • Cloud Text-To-Speech
      • Cloud Speech-To-Text
      • Cloud Translate Speech
      • Cloud Natural Language
      • Cloud Video Intelligence

      Those API implement quota verification and throttle requests that exceed the quota. Here are the relevant links describing it:







      There are several types of quotas and limits:



      • characters per day [403 - error  “Daily Limit Exceeded”]
      • characters per 100 seconds (per project or per project/user) [403 error “User Rate Limit Exceeded”] [TEMPORARY]



      • image file size
      • requests per minute [TEMPORARY]
      • images per feature per month 


      Text to speech:

      • total characters per request
      • requests per minute [TEMPORARY]
      • characters per minute [TEMPORARY]


      Speech to text

      • limits of the content size
      • limits of the phrases/characters per request for context
      • requests per 60 seconds [TEMPORARY]
      • processing per day

      Natural Language

      • Text Content size 
      • Token quota and Entity mentions (ignored?)
      • requests per 100 seconds [TEMPORARY]
      • requests per day 

      Video Intelligence

      • video size
      • requests per minute [TEMPORARY]
      • backend time in seconds per minute [TEMPORARY]


      In all Cloud AI operators we are using Python Client API. Most methods are using built-in Retry object and Retry mechanism. The assumption is that for functions that use the mechanism, it is implemented correctly and by default “retriable” errors only are retried. User can configure behaviour of the Retry object - exponential back-off factor, delays, etc. In the current API Retry object can be provided by the user creating the DAG and using the operator:

      The APIS that use Retry object are:

      • Cloud Vision Product Search
      • Cloud Vision Extra
      • Cloud Vision Detect
      • Cloud Natural Language
      • Cloud Speech
      • Cloud Video Intelligence


      The Retry mechanism provided by the Client API should be enough to handle temporary bursts of requests. User can control exponential back-off rate and will be able to adjust it to their own needs. They are also able to manually restart failed jobs using standard Airflow mechanisms in case their configuration is not well adjusted to their limits.


      The only case where Retry is not used in the API is Translate operator - specifically translate API. 


      In case of Translate API, the proposal is to use Retry decorator in our own hook and perform retries only in case of “User Rate Limit Exceeded” error, all other errors (size limit and Daily Limit Exceeded) should be treated as non-retriable. In those cases users will be able to manually restart failed jobs. 



      We analyzed two solutions:

      1. extension of the built-in mechanism from google-cloud-python library - Retry
      2. external library - tenacity


      The use of the first solution seems natural, but it is problematic. Each method creates a retry object by default from a configuration based on a private file with configuration.
      ] [https://github.com/googleapis/google-cloud-python/blob/b718d2d9bb32b0e7934ae90d57dc80c81ce0fb73/vision/google/cloud/vision_v1/gapic/image_annotator_client.py#L296-L304
      ]If we would like to extend this mechanism, we would have to copy the logic to this configuration. The google-cloud-python library does not allow us to easily change only part of the configuration of retry object.


      The retry mechanism is not supported by all services (See: Current approach), so there is a need to create a separate mechanism. The new mechanism based on the external library will work with all services. This will provide a more predictable developer experience.


      The tenacity library provides a code retry mechanism based on the decorator. It use wait strategy that applies exponential backoff. All hook methods that are covered by quota restrictions will get a new decorator.


      Sample implementation:

          wait=tenacity.wait_exponential(min=1, max=100),
      def fetch():
          response = client.translate(TEXT, target_language="PL")['translatedText']
          return response|



      retry_if_temporary_quota is a factory method that creates a predicate to check if the exception concerns the quota restriction.




            • Assignee:
              dzakus13 Kamil
              dzakus13 Kamil
            • Votes:
              0 Vote for this issue
              3 Start watching this issue


              • Created: