Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- Для объединения четырех DAG-ов в один с использованием `TaskGroup`, можно следовать следующему подходу. Этот пример будет базироваться на объединении кода, который вы предоставили. Мы создадим один DAG, в который включим все необходимые `TaskGroup`, соответствующие отдельным DAG-ам.
- ```python
- from airflow.decorators import dag
- from airflow import DAG
- from airflow.utils.dates import days_ago
- from datetime import datetime, timedelta
- from airflow.operators.python import PythonOperator
- from airflow.providers.postgres.operators.postgres import PostgresOperator
- from airflow.utils.task_group import TaskGroup
- from airflow.operators.trigger_dagrun import TriggerDagRunOperator
- # Импорт необходимых функций и утилит
- from lib.utils import get_scripts_path, get_pg_sqlalchemy_engine
- from projects_scripts.helpers import failure_callback, send_failure_notifications, checker
- # Общие параметры DAG-а
- default_args = {
- "retries": 2,
- "email_on_retry": False,
- "email_on_failure": True,
- "depends_on_past": False,
- "retry_delay": timedelta(minutes=5),
- "start_date": days_ago(1),
- "on_failure_callback": failure_callback
- }
- @dag(
- default_args=default_args,
- schedule_interval='@daily',
- catchup=False,
- max_active_runs=1,
- description='Объединенный DAG с использованием TaskGroup',
- tags=['etl', 'taskgroup']
- )
- def unified_dag():
- # Первый TaskGroup
- with TaskGroup('onec_on_update') as onec_group:
- check_task = PythonOperator(
- task_id='check_task',
- python_callable=checker,
- op_kwargs={'checkDAG': 'DATAMART-Telemarketing-on-update-2d'}
- )
- stage_app = PythonOperator(
- task_id='stage_app',
- python_callable=call_procedure,
- op_kwargs={"type_": "stage", "conn_id": "dwh_stage", "proc_name": "proc_get_onec_8222_date_001",
- "date_start": "{{ data_interval_start }}", "date_end": "{{ data_interval_end }}"},
- )
- stage_loan = PythonOperator(
- task_id='stage_loan',
- python_callable=call_procedure,
- op_kwargs={"type_": "stage", "conn_id": "dwh_stage", "proc_name": "proc_get_onec_8212_date_001",
- "date_start": "{{ data_interval_start }}", "date_end": "{{ data_interval_end }}"},
- )
- stage_anketa = PythonOperator(
- task_id='stage_anketa',
- python_callable=call_procedure,
- op_kwargs={"type_": "idrref", "conn_id": "dwh_stage", "proc_name": "proc_get_onec_anketa_idrref_001", "need_run": True},
- )
- check_task >> [stage_app, stage_loan] >> stage_anketa
- # Второй TaskGroup
- with TaskGroup('crm_on_update') as crm_group:
- stage_list = ['products', 'marketing_campaigns', 'autolists', 'communications']
- stage_tasks = []
- for task_name in stage_list:
- stage_task = PythonOperator(
- task_id=f'stage_{task_name}',
- python_callable=call_procedure,
- op_kwargs={"type_": "stage", "conn_id": "dwh_stage", "proc_name": f"proc_{task_name}",
- "date_start": "{{ data_interval_start }}", "date_end": "{{ data_interval_end }}"},
- )
- stage_tasks.append(stage_task)
- crm_stage_trigger = TriggerDagRunOperator(
- task_id='trigger_crm_dag',
- trigger_dag_id='CRM-ON-UPDATE'
- )
- stage_tasks >> crm_stage_trigger
- # Третий TaskGroup
- with TaskGroup('cc_on_update') as cc_group:
- stage_tasks = ['cc_member_attempt_history', 'cc_queue', 'cc_agent']
- for task_name in stage_tasks:
- cc_task = PythonOperator(
- task_id=f'stage_{task_name}',
- python_callable=call_procedure,
- op_kwargs={"type_": "stage", "conn_id": "dwh_stage", "proc_name": f"proc_{task_name}",
- "date_start": "{{ data_interval_start }}", "date_end": "{{ data_interval_end }}"},
- )
- cc_task
- cc_trigger = TriggerDagRunOperator(
- task_id='trigger_cc_dag',
- trigger_dag_id='CC-ON-UPDATE'
- )
- cc_group >> cc_trigger
- # Четвертый TaskGroup
- with TaskGroup('telemarketing_update') as telemarketing_group:
- tasks_list = ['tbl_r_loan_baskets', 'tbl_r_loan_baskets_total', 'tbl_r_Equifax_crm_new_d2']
- for task_name in tasks_list:
- telemarketing_task = PythonOperator(
- task_id=f'query_{task_name}',
- python_callable=run_query,
- op_kwargs={"conn_id": "dwh_gp_dm", "query": f"SELECT * FROM {task_name};", "name": task_name},
- )
- telemarketing_task
- # Главный поток
- onec_group >> crm_group >> cc_group >> telemarketing_group
- # Генерация DAG-а
- unified_dag = unified_dag()
- ```
- В этом примере объединены четыре DAG-а в один, используя `TaskGroup` для каждого. Главный поток задач (`onec_group`, `crm_group`, `cc_group`, `telemarketing_group`) выполняется последовательно, но вы можете изменить порядок или зависимости между группами по мере необходимости.
- Основные шаги:
- 1. Созданы `TaskGroup` для каждого DAG-а.
- 2. Внутри каждого `TaskGroup` создаются задачи и устанавливаются зависимости между ними.
- 3. Главный поток DAG-а связывает эти `TaskGroup` в нужной последовательности.
- Это должно объединить ваш код в один DAG с логической структурой, аналогичной отдельным DAG-ам.
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement