Uploaded image for project: 'Apache Airflow'
  1. Apache Airflow
  2. AIRFLOW-5927

Airflow cache import file or variable

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 1.10.3
    • Fix Version/s: None
    • Component/s: DAG, database
    • Labels:
      None

      Description

      I have a `config.py`  pull configure from `Variable` and merge into default config :

       

      from datetime import datetime
      from airflow.models import Variable
      class Config:
           version = "V21"
          etl_feature_dir = f'/data/dm/sales_forecast/features/version={version}'
           forecast_result_dir = f'/data/dm/sales_forecast/results/fbprophet/version={version}'
           forecast_model_dir = f'/data/dm/sales_forecast/models/fbprophet/version={version}'
           forecast_result_s3_dir = f's3a://xxxxx/data/dm/sales_forecast/fbprophet/version={version}'
           
           etl_dir = '/data/dm/sales_forecast/etl'
           feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT'
          dag_start_date = datetime(2019, 10, 25)
          etl_start_time = "2019-06-01 00:00:00"
           etl_end_time = " (execution_date + macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') "
          train_start_time = " (execution_date  macros.timedelta(days=90)).strftime('%Y%m-%d 00:00:00') "
           train_end_time = " execution_date.strftime('%Y-%m-%d 00:00:00') "
          predict_start_time = " execution_date.strftime('%Y-%m-%d 00:00:00') "
           predict_end_time = " (execution_date + macros.timedelta(days=7)).strftime('%Y-%m-%d 00:00:00') "
          report_start_date = " (execution_date  macros.timedelta(days=6)).strftime('%Y%m-%d 00:00:00') "
           report_end_date = " execution_date.strftime('%Y-%m-%d 00:00:00') "
          sf_schedule_report = "30 8  *  "
          sf_schedule_etl = '30 1  *  '
          sf_schedule_main_flow = "45 2  *  "
      CONFIG_KEY = 'sf_config_%s' % Config.version
      sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={})
      if sf_config:
           for k, v in sf_config.items():
               print(f'Overwrite {k} by {v}')
               if hasattr(Config, k):
                   if k == 'dag_start_date':
                       setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') )
                   elif v == 'None':
                       setattr(Config, k, None)
                   else:
                       setattr(Config, k, v)
       
      

       

       

       

      And I have 5 dag file import this Config . they have some similar code like 
       

       
      from datetime import datetime, timedelta
      from airflow import DAG
       from airflow.operators.dummy_operator import DummyOperator
       from airflow.operators.bash_operator import BashOperator
       from airflow.operators.dagrun_operator import TriggerDagRunOperator
       from airflow.models import Variable
      from sf_dags_n.config import Config
      default_args = {    'owner''mithril',    'depends_on_past': False,    'email': ['mithril'],    'email_on_failure': False,    'email_on_retry': False,    'retries': 2,    'schedule_interval': schedule_interval,}
       
      
      dag = DAG('dm_sfx_etl_%s' % Config.version, 
           start_date=Config.dag_start_date,
           default_args=default_args, 
           schedule_interval='20 1  *  ',
           user_defined_filters= {         'mod' : lambda s, d:s%d     }
      ,
       )
      
      
      
      

       
       
      The stange thing is : 
       
      Change `sf_schedule_etl` in Variable  took effect several times, but at some point , I couldn't change it from variable any more, even I directly hard code  it : 
       
      ```
      dag = DAG('dm_sfx_etl_%s' % Config.version, 
          start_date=Config.dag_start_date ,
          default_args=default_args, 
          schedule_interval='20 1      *    ',
          user_defined_filters= {         'mod' : lambda s, d:s%d     }

      ,
      )
      ```
       
      If such situation came, even delete dag file and delete from airflow webui ,didn't change `schedule_interval` . 
       
      PS: my dag file have running for some days, in these days ,I may add some operator to it , or change some operator type, but it still fine .   I think there must be some cache in airflow lead to this  problem.
       
       
       
       
       

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              mithril kasim
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated: