Advertisement
1nikitas

Untitled

May 10th, 2023
124
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.84 KB | None | 0 0
  1. import logging
  2. from datetime import datetime, timedelta
  3. from airflow import DAG
  4. from airflow.models import DagRun, DagTag, TaskInstance, Log, Variable
  5. from airflow.operators.empty import EmptyOperator
  6. from airflow.operators.python import PythonOperator
  7. from airflow.utils.state import State
  8. from airflow.utils.session import create_session
  9.  
  10.  
  11. logger = logging.getLogger("airflow.task")
  12.  
  13.  
  14. def get_failed_dags_with_specified_tags(
  15. tags: List[str], after_execution_date: datetime, session: Any
  16. ) -> List[Tuple[DagRun, TaskInstance]]:
  17. """Получение списка неудачных запусков DAG, содержащих указанные теги"""
  18. failed_dag_runs = (
  19. session.query(DagRun)
  20. .filter(
  21. DagRun.state == State.FAILED,
  22. DagRun.execution_date >= after_execution_date,
  23. )
  24. .all()
  25. )
  26.  
  27. failed_dags_with_tags = []
  28.  
  29. for dag_run in failed_dag_runs:
  30. dag_tags = session.query(DagTag).filter(DagTag.dag_id == dag_run.dag_id).all()
  31. if set(tags).issubset({tag.name for tag in dag_tags}):
  32. failed_task = (
  33. session.query(TaskInstance)
  34. .filter(
  35. TaskInstance.dag_id == dag_run.dag_id,
  36. TaskInstance.execution_date == dag_run.execution_date,
  37. TaskInstance.state == State.FAILED,
  38. )
  39. .order_by(TaskInstance.end_date.desc())
  40. .first()
  41. )
  42. failed_dags_with_tags.append((dag_run, failed_task))
  43.  
  44. return failed_dags_with_tags
  45.  
  46.  
  47. def print_failed_dags_with_specified_tags(**context: Dict[str, Any]) -> None:
  48. """Вывод информации о неудачных запусках DAG с указанными тегами"""
  49. tags = context["params"]["tags"]
  50. current_ti = context["ti"]
  51. last_checked_time = Variable.get("last_check_time", None, deserialize_json=True)
  52.  
  53. logger.info(f"Последнее время обновления: {last_checked_time}")
  54.  
  55. if last_checked_time:
  56. last_check_datetime = datetime.fromisoformat(last_checked_time)
  57. else:
  58. last_check_datetime = datetime.min
  59.  
  60. Variable.set(
  61. "last_check_time", current_ti.execution_date.isoformat(), serialize_json=True
  62. )
  63. logger.info(f"Время последнего обновления успешно изменено: {last_checked_time}")
  64.  
  65. with create_session() as session:
  66. failed_dags = get_failed_dags_with_specified_tags(
  67. tags, last_check_datetime, session
  68. )
  69. logger.info(f"Количество DAG's со статусов failed: {len(failed_dags)}")
  70. for dag_run, failed_task in failed_dags:
  71. logger.info(
  72. f"DAG ID: {dag_run.dag_id}, Run ID: {dag_run.run_id}, Execution Date: {dag_run.execution_date}, "
  73. f"Task ID: {failed_task.task_id}, Status: {failed_task.state} Log URL: {failed_task.log_url}"
  74. )
  75.  
  76.  
  77. dag_params = {
  78. "dag_id": "routine_monitor_failed_dags",
  79. "description": "DAG, который отслеживает неудавшиеся запуски других DAG с указанными тегами и выводит информацию о них",
  80. "start_date": datetime(2023, 5, 10),
  81. "schedule_interval": timedelta(minutes=5),
  82. "catchup": False,
  83. "tags": ["test"],
  84. }
  85.  
  86.  
  87. with DAG(**dag_params) as dag: # type: ignore
  88. start = EmptyOperator(task_id="start")
  89.  
  90. print_failed_dags_task = PythonOperator(
  91. task_id="print_failed_dags_with_specified_tags",
  92. python_callable=print_failed_dags_with_specified_tags,
  93. provide_context=True,
  94. params={"tags": ["test"]},
  95. dag=dag,
  96. )
  97.  
  98. finish = EmptyOperator(task_id="finish")
  99.  
  100. start >> print_failed_dags_task >> finish
  101.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement