Tl;dr:
How to access parameters in dag?
I'm referring to input_number
in params
:
with DAG(dag_id=v_dag_id,
description=f'Ejecución SQL {v_project_name}',
tags=['caba', '16860', 'kde063'],
default_args=v_default_args,
schedule_interval=None,
max_active_tasks=12,
max_active_runs=10,
start_date=v_start_date,
catchup=v_catchup,
params = {"input_number": 13} # <------ this number
) as dag:
with TaskGroup(group_id='test_exadata') as test_exadata:
a = []
for i in range(0,13): # <------ how to parametrize this number 13 to be equal to input_number????
nombre = 'test_exadata_'+str(i)
a.append(
CustomExadataOperator(
task_id=nombre,
repo_url= REPO_URL,
repo_branch= REPO_BRANCH,
sql_path= f'src/sql/test_query.sql',
engine= SECRETS_ENGINE,
secret= SECRETS_PATH,
vault_token_id= 'AIRFLOW_VAULT_TOKEN',
output_mode= 'log',
dag= dag
)
)
test_exadata
Long story:
I have a dag in airflow where I set up some default parameters such as last month date (format YYYYMM), the number of months I want to go back in my analysis (an integer) and the name of the table I want to create in my database.
Basically, given a month_input : 202412
, months_hist_input : 13
and prefix_name_table_input : TEMP_XXX_
I wrote a code that, starting from 202412
and going backwards, will create tables TEMP_XXX_202412
, TEMP_XXX_202411
, ...TEMP_XXX_202401
.
The parameters are set in the argument params
:
with DAG(dag_id=v_dag_id,
description=f'Ejecución SQL {v_project_name}',
tags=['caba', '16860', 'kde063'],
default_args=v_default_args,
schedule_interval=None,
max_active_tasks=12,
max_active_runs=10,
start_date=v_start_date,
catchup=v_catchup,
params = {
"month_input": (date.today()+relativedelta(months=-1)).strftime('%Y%m'),
"months_hist_input": 13,
"prefix_name_table_input": "TEMP_XXX_"
}
) as dag:
I access these parameters in the following way (within the DAG):
def _create_dates_and_names(month,months_hist,prefix_name_table):
months_hist = int(months_hist)
dates = []
names = []
for i in range(0,months_hist):
dates.append((datetime.strptime(month, '%Y%m') - relativedelta(months=i)).strftime('%Y%m'))
names.append(f'{prefix_name_table}_{dates[i]}')
context = get_current_context()
ti = context["ti"]
ti.xcom_push(key="dates", value=dates)
ti.xcom_push(key="names", value=names)
create_dates_and_names = PythonOperator(
task_id = 'create_dates_and_names',
python_callable = _create_dates_and_names,
op_kwargs = {
'month' : '{{params["month_input"]}}',
'months_hist': '{{params["months_hist_input"]}}',
'prefix_name_table': '{{params["prefix_name_table_input"]}}'},
provide_context = True
)
And then use these "values" in the following task through the sql_args
parameter:
with TaskGroup(group_id='test_exadata') as test_exadata:
a = []
for i in range(0,13): # <----------- how to parametrize this number 13 to be equal to months_hist_input????
nombre = 'test_exadata_'+str(i)
a.append(
CustomExadataOperator(
task_id=nombre,
repo_url= REPO_URL,
repo_branch= REPO_BRANCH,
sql_path= f'src/sql/test_query.sql',
sql_args = f'{{{{ti.xcom_pull(key="dates",task_ids="create_dates_and_names")[{i}]}}}} {{{{ti.xcom_pull(key="names",task_ids="create_dates_and_names")[{i}]}}}}',
engine= SECRETS_ENGINE,
secret= SECRETS_PATH,
vault_token_id= 'AIRFLOW_VAULT_TOKEN',
output_mode= 'log',
trigger_rule = 'none_failed_min_one_success',
dag= dag
)
)
create_dates_and_names >> test_exadata
This works.
The question is how to parametrize the number through which iterate in the group task? I'm referring to the 13
in for i in range(0,13):
which I would like to be equal to the input parameter months_hist_input
.
I cannot write for i in range(0,int({{params["months_hist_input"]}})):
because it will take {{params["months_hist_input"]}}
as the string "{{params["months_hist_input"]}}"
.
Should I customize the group task using a function?