Advertisement
Evgeny_Baulin

Untitled

Aug 12th, 2024
17
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.19 KB | None | 0 0
  1. Для объединения четырех DAG-ов в один с использованием `TaskGroup`, можно следовать следующему подходу. Этот пример будет базироваться на объединении кода, который вы предоставили. Мы создадим один DAG, в который включим все необходимые `TaskGroup`, соответствующие отдельным DAG-ам.
  2.  
  3. ```python
  4. from airflow.decorators import dag
  5. from airflow import DAG
  6. from airflow.utils.dates import days_ago
  7. from datetime import datetime, timedelta
  8. from airflow.operators.python import PythonOperator
  9. from airflow.providers.postgres.operators.postgres import PostgresOperator
  10. from airflow.utils.task_group import TaskGroup
  11. from airflow.operators.trigger_dagrun import TriggerDagRunOperator
  12.  
  13. # Импорт необходимых функций и утилит
  14. from lib.utils import get_scripts_path, get_pg_sqlalchemy_engine
  15. from projects_scripts.helpers import failure_callback, send_failure_notifications, checker
  16.  
  17. # Общие параметры DAG-а
  18. default_args = {
  19. "retries": 2,
  20. "email_on_retry": False,
  21. "email_on_failure": True,
  22. "depends_on_past": False,
  23. "retry_delay": timedelta(minutes=5),
  24. "start_date": days_ago(1),
  25. "on_failure_callback": failure_callback
  26. }
  27.  
  28. @dag(
  29. default_args=default_args,
  30. schedule_interval='@daily',
  31. catchup=False,
  32. max_active_runs=1,
  33. description='Объединенный DAG с использованием TaskGroup',
  34. tags=['etl', 'taskgroup']
  35. )
  36. def unified_dag():
  37.  
  38. # Первый TaskGroup
  39. with TaskGroup('onec_on_update') as onec_group:
  40. check_task = PythonOperator(
  41. task_id='check_task',
  42. python_callable=checker,
  43. op_kwargs={'checkDAG': 'DATAMART-Telemarketing-on-update-2d'}
  44. )
  45.  
  46. stage_app = PythonOperator(
  47. task_id='stage_app',
  48. python_callable=call_procedure,
  49. op_kwargs={"type_": "stage", "conn_id": "dwh_stage", "proc_name": "proc_get_onec_8222_date_001",
  50. "date_start": "{{ data_interval_start }}", "date_end": "{{ data_interval_end }}"},
  51. )
  52. stage_loan = PythonOperator(
  53. task_id='stage_loan',
  54. python_callable=call_procedure,
  55. op_kwargs={"type_": "stage", "conn_id": "dwh_stage", "proc_name": "proc_get_onec_8212_date_001",
  56. "date_start": "{{ data_interval_start }}", "date_end": "{{ data_interval_end }}"},
  57. )
  58.  
  59. stage_anketa = PythonOperator(
  60. task_id='stage_anketa',
  61. python_callable=call_procedure,
  62. op_kwargs={"type_": "idrref", "conn_id": "dwh_stage", "proc_name": "proc_get_onec_anketa_idrref_001", "need_run": True},
  63. )
  64.  
  65. check_task >> [stage_app, stage_loan] >> stage_anketa
  66.  
  67. # Второй TaskGroup
  68. with TaskGroup('crm_on_update') as crm_group:
  69. stage_list = ['products', 'marketing_campaigns', 'autolists', 'communications']
  70.  
  71. stage_tasks = []
  72. for task_name in stage_list:
  73. stage_task = PythonOperator(
  74. task_id=f'stage_{task_name}',
  75. python_callable=call_procedure,
  76. op_kwargs={"type_": "stage", "conn_id": "dwh_stage", "proc_name": f"proc_{task_name}",
  77. "date_start": "{{ data_interval_start }}", "date_end": "{{ data_interval_end }}"},
  78. )
  79. stage_tasks.append(stage_task)
  80.  
  81. crm_stage_trigger = TriggerDagRunOperator(
  82. task_id='trigger_crm_dag',
  83. trigger_dag_id='CRM-ON-UPDATE'
  84. )
  85.  
  86. stage_tasks >> crm_stage_trigger
  87.  
  88. # Третий TaskGroup
  89. with TaskGroup('cc_on_update') as cc_group:
  90. stage_tasks = ['cc_member_attempt_history', 'cc_queue', 'cc_agent']
  91.  
  92. for task_name in stage_tasks:
  93. cc_task = PythonOperator(
  94. task_id=f'stage_{task_name}',
  95. python_callable=call_procedure,
  96. op_kwargs={"type_": "stage", "conn_id": "dwh_stage", "proc_name": f"proc_{task_name}",
  97. "date_start": "{{ data_interval_start }}", "date_end": "{{ data_interval_end }}"},
  98. )
  99. cc_task
  100.  
  101. cc_trigger = TriggerDagRunOperator(
  102. task_id='trigger_cc_dag',
  103. trigger_dag_id='CC-ON-UPDATE'
  104. )
  105.  
  106. cc_group >> cc_trigger
  107.  
  108. # Четвертый TaskGroup
  109. with TaskGroup('telemarketing_update') as telemarketing_group:
  110. tasks_list = ['tbl_r_loan_baskets', 'tbl_r_loan_baskets_total', 'tbl_r_Equifax_crm_new_d2']
  111.  
  112. for task_name in tasks_list:
  113. telemarketing_task = PythonOperator(
  114. task_id=f'query_{task_name}',
  115. python_callable=run_query,
  116. op_kwargs={"conn_id": "dwh_gp_dm", "query": f"SELECT * FROM {task_name};", "name": task_name},
  117. )
  118. telemarketing_task
  119.  
  120. # Главный поток
  121. onec_group >> crm_group >> cc_group >> telemarketing_group
  122.  
  123. # Генерация DAG-а
  124. unified_dag = unified_dag()
  125. ```
  126.  
  127. В этом примере объединены четыре DAG-а в один, используя `TaskGroup` для каждого. Главный поток задач (`onec_group`, `crm_group`, `cc_group`, `telemarketing_group`) выполняется последовательно, но вы можете изменить порядок или зависимости между группами по мере необходимости.
  128.  
  129. Основные шаги:
  130. 1. Созданы `TaskGroup` для каждого DAG-а.
  131. 2. Внутри каждого `TaskGroup` создаются задачи и устанавливаются зависимости между ними.
  132. 3. Главный поток DAG-а связывает эти `TaskGroup` в нужной последовательности.
  133.  
  134. Это должно объединить ваш код в один DAG с логической структурой, аналогичной отдельным DAG-ам.
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement