Advertisement
1nikitas

Untitled

Apr 21st, 2023
137
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.01 KB | None | 0 0
  1. from datetime import datetime
  2. from typing import Dict
  3.  
  4. from airflow import DAG
  5. from airflow.operators.empty import EmptyOperator
  6. from airflow.operators.python import PythonOperator
  7.  
  8. from dependencies.services.telegram import send_telegram_message, TELEGRAM_CONN_ID
  9.  
  10.  
  11. dag_params = {
  12. "dag_id": "test_telegram_dag",
  13. "description": "Тестовый DAG для отправки в Telegram",
  14. "schedule_interval": None,
  15. "start_date": datetime(2021, 1, 1),
  16. 'max_active_tasks': 1,
  17. 'max_active_runs': 1,
  18. 'catchup': False,
  19. 'tags': ['test', 'telegram'],
  20. }
  21.  
  22.  
  23. def get_test_data() -> Dict:
  24. """Тестовые данные для рендера шаблона"""
  25. return {
  26. "name": "Nikita",
  27. "tables_info": [
  28. {
  29. "table_name": "allrp.dm_contract_report",
  30. "table_rows_count_before": 5668211,
  31. "table_rows_count_now": 5681478,
  32. },
  33. {
  34. "table_name": "allrp.dm_platform_report",
  35. "table_rows_count_before": 5826149,
  36. "table_rows_count_now": 5835724,
  37. },
  38. {
  39. "table_name": "allrp.dm_advertiser_report",
  40. "table_rows_count_before": 11506417,
  41. "table_rows_count_now": 11517276,
  42. },
  43. {
  44. "table_name": "allrp.dm_invoice_main_report",
  45. "table_rows_count_before": 6956874,
  46. "table_rows_count_now": 6961791,
  47. },
  48. {
  49. "table_name": "main.ds_fct_creative_distributed",
  50. "table_rows_count_before": 17546031329,
  51. "table_rows_count_now": 17691349237,
  52. },
  53. {
  54. "table_name": "main.ds_fct_show_distributed",
  55. "table_rows_count_before": 35316416342,
  56. "table_rows_count_now": 35316713423,
  57. },
  58. ],
  59. }
  60.  
  61.  
  62. with DAG(**dag_params) as dag: # type: ignore
  63. start = EmptyOperator(task_id="start")
  64.  
  65. get_data = PythonOperator(
  66. task_id="get_test_data",
  67. python_callable=get_test_data,
  68. )
  69.  
  70. send_simple_message = PythonOperator(
  71. task_id="send_simple_message_telegram",
  72. python_callable=send_telegram_message,
  73. op_kwargs={
  74. "conn_id": TELEGRAM_CONN_ID,
  75. "template_message": "Привет, '{{ name }}'! Добро пожаловать в ЕРИР!",
  76. },
  77. )
  78.  
  79. send_message_using_file_template = PythonOperator(
  80. task_id="send_message_telegram_using_file_template",
  81. python_callable=send_telegram_message,
  82. op_kwargs={
  83. "conn_id": TELEGRAM_CONN_ID,
  84. "template_path": "template_airflow_info.j2",
  85. "template_data_task_id": "get_test_data",
  86. },
  87. )
  88.  
  89. finish = EmptyOperator(task_id="finish")
  90.  
  91. start >> \
  92. get_data >> \
  93. send_simple_message >> \
  94. send_message_using_file_template >> \
  95. finish
  96.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement