学習

import os
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

def process_file(*args, **kwargs):
    file_path = kwargs['file_path']
    print(f"Processing file: {file_path}")

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2023, 4, 19),
}

dag = DAG(
    'dynamic_dag_files_example',
    default_args=default_args,
    description='Dynamic DAG for processing files with individual tasks',
    schedule_interval=timedelta(days=1),
    catchup=False,
)

start_task = DummyOperator(
    task_id='start_task',
    dag=dag,
)

end_task = DummyOperator(
    task_id='end_task',
    dag=dag,
)

folder_path = '/path/to/your/folder'

file_list = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))]

for i, file_path in enumerate(file_list):
    task_id = f'process_file_{i}'

    dynamic_task = PythonOperator(
        task_id=task_id,
        python_callable=process_file,
        op_args=[],
        op_kwargs={'file_path': file_path},
        provide_context=True,
        dag=dag,
    )

    start_task >> dynamic_task >> end_task