scheduled tasks - Airflow: Passing variable in DAG - Stack Overflow

admin2025-04-19  5

Tl;dr:

How to access parameters in dag?

I'm referring to input_number in params:

with DAG(dag_id=v_dag_id,
     description=f'Ejecución SQL {v_project_name}',
     tags=['caba', '16860', 'kde063'],
     default_args=v_default_args,
     schedule_interval=None,
     max_active_tasks=12,
     max_active_runs=10,
     start_date=v_start_date,
     catchup=v_catchup,
     params = {"input_number": 13} # <------ this number
        ) as dag:

with TaskGroup(group_id='test_exadata') as test_exadata:

        a = []
        for i in range(0,13): # <------ how to parametrize this number 13 to be equal to input_number????
            nombre = 'test_exadata_'+str(i)
            a.append(
            CustomExadataOperator(
                task_id=nombre,
                repo_url= REPO_URL,
                repo_branch= REPO_BRANCH,
                sql_path= f'src/sql/test_query.sql',
                engine= SECRETS_ENGINE,
                secret= SECRETS_PATH,
                vault_token_id= 'AIRFLOW_VAULT_TOKEN',
                output_mode= 'log',
                dag= dag
                )
            )

test_exadata

Long story:

I have a dag in airflow where I set up some default parameters such as last month date (format YYYYMM), the number of months I want to go back in my analysis (an integer) and the name of the table I want to create in my database.

Basically, given a month_input : 202412, months_hist_input : 13 and prefix_name_table_input : TEMP_XXX_ I wrote a code that, starting from 202412 and going backwards, will create tables TEMP_XXX_202412, TEMP_XXX_202411, ...TEMP_XXX_202401.

The parameters are set in the argument params:

with DAG(dag_id=v_dag_id,
         description=f'Ejecución SQL {v_project_name}',
         tags=['caba', '16860', 'kde063'],
         default_args=v_default_args,
         schedule_interval=None,
         max_active_tasks=12,
         max_active_runs=10,
         start_date=v_start_date,
         catchup=v_catchup,
         params = {
             "month_input": (date.today()+relativedelta(months=-1)).strftime('%Y%m'),
             "months_hist_input": 13,
             "prefix_name_table_input": "TEMP_XXX_"
             }
            ) as dag:

I access these parameters in the following way (within the DAG):

def _create_dates_and_names(month,months_hist,prefix_name_table):
    
    months_hist = int(months_hist)
    dates = []
    names = []
    
    for i in range(0,months_hist):
        dates.append((datetime.strptime(month, '%Y%m') - relativedelta(months=i)).strftime('%Y%m'))
        names.append(f'{prefix_name_table}_{dates[i]}')

    context = get_current_context()
    ti = context["ti"]
    ti.xcom_push(key="dates", value=dates)
    ti.xcom_push(key="names", value=names)


create_dates_and_names = PythonOperator(
    task_id = 'create_dates_and_names',
    python_callable = _create_dates_and_names,
    op_kwargs = {
        'month' : '{{params["month_input"]}}',
        'months_hist': '{{params["months_hist_input"]}}',
        'prefix_name_table': '{{params["prefix_name_table_input"]}}'},
    provide_context = True
)

And then use these "values" in the following task through the sql_args parameter:

    with TaskGroup(group_id='test_exadata') as test_exadata:

        a = []

        for i in range(0,13):  # <----------- how to parametrize this number 13 to be equal to months_hist_input????
            nombre = 'test_exadata_'+str(i)
            a.append(
            CustomExadataOperator(
                task_id=nombre,
                repo_url= REPO_URL,
                repo_branch= REPO_BRANCH,
                sql_path= f'src/sql/test_query.sql',
                sql_args = f'{{{{ti.xcom_pull(key="dates",task_ids="create_dates_and_names")[{i}]}}}} {{{{ti.xcom_pull(key="names",task_ids="create_dates_and_names")[{i}]}}}}',
                engine= SECRETS_ENGINE,
                secret= SECRETS_PATH,
                vault_token_id= 'AIRFLOW_VAULT_TOKEN',
                output_mode= 'log',
                trigger_rule = 'none_failed_min_one_success',
                dag= dag
                )
            )

create_dates_and_names >> test_exadata

This works.

The question is how to parametrize the number through which iterate in the group task? I'm referring to the 13 in for i in range(0,13): which I would like to be equal to the input parameter months_hist_input.

I cannot write for i in range(0,int({{params["months_hist_input"]}})): because it will take {{params["months_hist_input"]}} as the string "{{params["months_hist_input"]}}".

Should I customize the group task using a function?

转载请注明原文地址:http://anycun.com/QandA/1745028507a90351.html