Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from datetime import datetime
- from typing import Dict
- from airflow import DAG
- from airflow.operators.empty import EmptyOperator
- from airflow.operators.python import PythonOperator
- from dependencies.services.telegram import send_telegram_message, TELEGRAM_CONN_ID
- dag_params = {
- "dag_id": "test_telegram_dag",
- "description": "Тестовый DAG для отправки в Telegram",
- "schedule_interval": None,
- "start_date": datetime(2021, 1, 1),
- 'max_active_tasks': 1,
- 'max_active_runs': 1,
- 'catchup': False,
- 'tags': ['test', 'telegram'],
- }
- def get_test_data() -> Dict:
- """Тестовые данные для рендера шаблона"""
- return {
- "name": "Nikita",
- "tables_info": [
- {
- "table_name": "allrp.dm_contract_report",
- "table_rows_count_before": 5668211,
- "table_rows_count_now": 5681478,
- },
- {
- "table_name": "allrp.dm_platform_report",
- "table_rows_count_before": 5826149,
- "table_rows_count_now": 5835724,
- },
- {
- "table_name": "allrp.dm_advertiser_report",
- "table_rows_count_before": 11506417,
- "table_rows_count_now": 11517276,
- },
- {
- "table_name": "allrp.dm_invoice_main_report",
- "table_rows_count_before": 6956874,
- "table_rows_count_now": 6961791,
- },
- {
- "table_name": "main.ds_fct_creative_distributed",
- "table_rows_count_before": 17546031329,
- "table_rows_count_now": 17691349237,
- },
- {
- "table_name": "main.ds_fct_show_distributed",
- "table_rows_count_before": 35316416342,
- "table_rows_count_now": 35316713423,
- },
- ],
- }
- with DAG(**dag_params) as dag: # type: ignore
- start = EmptyOperator(task_id="start")
- get_data = PythonOperator(
- task_id="get_test_data",
- python_callable=get_test_data,
- )
- send_simple_message = PythonOperator(
- task_id="send_simple_message_telegram",
- python_callable=send_telegram_message,
- op_kwargs={
- "conn_id": TELEGRAM_CONN_ID,
- "template_message": "Привет, '{{ name }}'! Добро пожаловать в ЕРИР!",
- },
- )
- send_message_using_file_template = PythonOperator(
- task_id="send_message_telegram_using_file_template",
- python_callable=send_telegram_message,
- op_kwargs={
- "conn_id": TELEGRAM_CONN_ID,
- "template_path": "template_airflow_info.j2",
- "template_data_task_id": "get_test_data",
- },
- )
- finish = EmptyOperator(task_id="finish")
- start >> \
- get_data >> \
- send_simple_message >> \
- send_message_using_file_template >> \
- finish
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement