Advertisement
1nikitas

Untitled

May 10th, 2023
127
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.98 KB | None | 0 0
  1. TypeError: get_failed_dags_with_specified_tags() missing 2 required positional arguments: 'tags' and 'after_execution_date'
  2.  
  3.  
  4. import logging
  5. from datetime import datetime, timedelta
  6. from typing import List, Tuple
  7. from airflow import DAG
  8. from airflow.models import DagRun, DagTag, TaskInstance, Variable
  9. from airflow.operators.empty import EmptyOperator
  10. from airflow.operators.python import PythonOperator
  11. from airflow.utils.state import State
  12. from airflow.utils.session import create_session
  13.  
  14.  
  15. logger = logging.getLogger("airflow.task")
  16.  
  17.  
  18. def get_last_checked_time() -> datetime:
  19. """Получает последнее время проверки неудачных запусков DAG"""
  20. last_checked_time = Variable.get("last_check_time", None, deserialize_json=True)
  21. if last_checked_time:
  22. return datetime.fromisoformat(last_checked_time)
  23. else:
  24. return datetime.min
  25.  
  26.  
  27. def update_last_checked_time(execution_date: datetime):
  28. """Обновляет время последней проверки неудачных запусков DAG"""
  29. Variable.set("last_check_time", execution_date.isoformat(), serialize_json=True)
  30. logger.info("Время последней проверки успешно обновлено")
  31.  
  32.  
  33. def get_failed_dags_with_specified_tags(tags: List[str], after_execution_date: datetime) -> List[Tuple[DagRun, TaskInstance]]:
  34. """Получает список неудачных запусков DAG с указанными тегами"""
  35. with create_session() as session:
  36. failed_dag_runs = (
  37. session.query(DagRun)
  38. .filter(
  39. DagRun.state == State.FAILED,
  40. DagRun.execution_date >= after_execution_date,
  41. )
  42. .all()
  43. )
  44.  
  45. failed_dags_with_tags = []
  46. for dag_run in failed_dag_runs:
  47. dag_tags = session.query(DagTag).filter(DagTag.dag_id == dag_run.dag_id).all()
  48. if set(tags).issubset({tag.name for tag in dag_tags}):
  49. failed_task = (
  50. session.query(TaskInstance)
  51. .filter(
  52. TaskInstance.dag_id == dag_run.dag_id,
  53. TaskInstance.execution_date == dag_run.execution_date,
  54. TaskInstance.state == State.FAILED,
  55. )
  56. .order_by(TaskInstance.end_date.desc())
  57. .first()
  58. )
  59. failed_dags_with_tags.append((dag_run, failed_task))
  60.  
  61. return failed_dags_with_tags
  62.  
  63.  
  64. def log_failed_dags_info(failed_dags: List[Tuple[DagRun, TaskInstance]]):
  65. """Выводит информацию о неудачных запусках DAG в журнале Airflow"""
  66. logger.info(f"Количество DAG's со статусом failed: {len(failed_dags)}")
  67. for dag_run, failed_task in failed_dags:
  68. logger.info(
  69. f"DAG ID: {dag_run.dag_id}, Run ID: {dag_run.run_id}, Execution Date: {dag_run.execution_date}, "
  70. f"Task ID: {failed_task.task_id}, Status: {failed_task.state} Log URL: {failed_task.log_url}"
  71. )
  72.  
  73.  
  74. dag_params = {
  75. "dag_id": "routine_monitor_failed_dags",
  76. "description": "DAG, который отслеживает неудавшиеся запуски других DAG с указанными тегами и выводит информацию о них",
  77. "start_date": datetime(2023, 5, 10),
  78. "schedule_interval": timedelta(minutes=5),
  79. "catchup": False,
  80. "tags": ["test"],
  81. }
  82.  
  83.  
  84. with DAG(**dag_params) as dag: # type: ignore
  85. start = EmptyOperator(task_id="start")
  86.  
  87. print_failed_dags_task = PythonOperator(
  88. task_id="print_failed_dags_with_specified_tags",
  89. python_callable=get_failed_dags_with_specified_tags,
  90. provide_context=True,
  91. params={"tags": ["test"]},
  92. dag=dag,
  93. )
  94.  
  95. finish = EmptyOperator(task_id="finish")
  96.  
  97. start >> print_failed_dags_task >> finish
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement