学習
import os
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
def process_file(*args, **kwargs):
file_path = kwargs['file_path']
print(f"Processing file: {file_path}")
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'start_date': datetime(2023, 4, 19),
}
dag = DAG(
'dynamic_dag_files_example',
default_args=default_args,
description='Dynamic DAG for processing files with individual tasks',
schedule_interval=timedelta(days=1),
catchup=False,
)
start_task = DummyOperator(
task_id='start_task',
dag=dag,
)
end_task = DummyOperator(
task_id='end_task',
dag=dag,
)
folder_path = '/path/to/your/folder'
file_list = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))]
for i, file_path in enumerate(file_list):
task_id = f'process_file_{i}'
dynamic_task = PythonOperator(
task_id=task_id,
python_callable=process_file,
op_args=[],
op_kwargs={'file_path': file_path},
provide_context=True,
dag=dag,
)
start_task >> dynamic_task >> end_task