1 year ago
#223323
lifanxi
How to create TaskFlow DAGs dynamically in AirFlow 2.0?
I have a parameterized DAG and I want to programmatically create DAGs instances based on this DAG.
In traditional Airflow model, I can achieve this easily using a loop:
# Code sample from: https://github.com/astronomer/dynamic-dags-tutorial/blob/main/dags/dynamic-dags-loop.py
def create_dag(dag_id,
schedule,
dag_number,
default_args):
def hello_world_py(*args):
print('Hello World')
print('This is DAG: {}'.format(str(dag_number)))
dag = DAG(dag_id,
schedule_interval=schedule,
default_args=default_args)
with dag:
t1 = PythonOperator(
task_id='hello_world',
python_callable=hello_world_py)
return dag
for n in range(1, 4):
dag_id = 'loop_hello_world_{}'.format(str(n))
// ...
globals()[dag_id] = create_dag(dag_id,
schedule,
dag_number,
default_args)
How I can implement similar behavior in AirFlow 2.0 using TaskFlow model?
I have tried to manually expand the @dag decorations like the following code, but it does not work. The dynamically created DAGs do not have any tasks in them:
@dag(schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['test'])
def flow_test():
@task()
def sleep():
time.sleep(5)
t1 = sleep()
for n in range(1,3):
dag_id = 'flow_test_{}'.format(str(n))
globals()[dag_id] = dag(dag_id=dag_id, tags=['test'])(flow_test)()
airflow-2.x
airflow-taskflow
0 Answers
Your Answer