Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import logging
- from datetime import datetime, timedelta
- from airflow import DAG
- from airflow.models import DagRun, DagTag, TaskInstance, Log, Variable
- from airflow.operators.empty import EmptyOperator
- from airflow.operators.python import PythonOperator
- from airflow.utils.state import State
- from airflow.utils.session import create_session
- logger = logging.getLogger("airflow.task")
- def get_failed_dags_with_specified_tags(
- tags: List[str], after_execution_date: datetime, session: Any
- ) -> List[Tuple[DagRun, TaskInstance]]:
- """Получение списка неудачных запусков DAG, содержащих указанные теги"""
- failed_dag_runs = (
- session.query(DagRun)
- .filter(
- DagRun.state == State.FAILED,
- DagRun.execution_date >= after_execution_date,
- )
- .all()
- )
- failed_dags_with_tags = []
- for dag_run in failed_dag_runs:
- dag_tags = session.query(DagTag).filter(DagTag.dag_id == dag_run.dag_id).all()
- if set(tags).issubset({tag.name for tag in dag_tags}):
- failed_task = (
- session.query(TaskInstance)
- .filter(
- TaskInstance.dag_id == dag_run.dag_id,
- TaskInstance.execution_date == dag_run.execution_date,
- TaskInstance.state == State.FAILED,
- )
- .order_by(TaskInstance.end_date.desc())
- .first()
- )
- failed_dags_with_tags.append((dag_run, failed_task))
- return failed_dags_with_tags
- def print_failed_dags_with_specified_tags(**context: Dict[str, Any]) -> None:
- """Вывод информации о неудачных запусках DAG с указанными тегами"""
- tags = context["params"]["tags"]
- current_ti = context["ti"]
- last_checked_time = Variable.get("last_check_time", None, deserialize_json=True)
- logger.info(f"Последнее время обновления: {last_checked_time}")
- if last_checked_time:
- last_check_datetime = datetime.fromisoformat(last_checked_time)
- else:
- last_check_datetime = datetime.min
- Variable.set(
- "last_check_time", current_ti.execution_date.isoformat(), serialize_json=True
- )
- logger.info(f"Время последнего обновления успешно изменено: {last_checked_time}")
- with create_session() as session:
- failed_dags = get_failed_dags_with_specified_tags(
- tags, last_check_datetime, session
- )
- logger.info(f"Количество DAG's со статусов failed: {len(failed_dags)}")
- for dag_run, failed_task in failed_dags:
- logger.info(
- f"DAG ID: {dag_run.dag_id}, Run ID: {dag_run.run_id}, Execution Date: {dag_run.execution_date}, "
- f"Task ID: {failed_task.task_id}, Status: {failed_task.state} Log URL: {failed_task.log_url}"
- )
- dag_params = {
- "dag_id": "routine_monitor_failed_dags",
- "description": "DAG, который отслеживает неудавшиеся запуски других DAG с указанными тегами и выводит информацию о них",
- "start_date": datetime(2023, 5, 10),
- "schedule_interval": timedelta(minutes=5),
- "catchup": False,
- "tags": ["test"],
- }
- with DAG(**dag_params) as dag: # type: ignore
- start = EmptyOperator(task_id="start")
- print_failed_dags_task = PythonOperator(
- task_id="print_failed_dags_with_specified_tags",
- python_callable=print_failed_dags_with_specified_tags,
- provide_context=True,
- params={"tags": ["test"]},
- dag=dag,
- )
- finish = EmptyOperator(task_id="finish")
- start >> print_failed_dags_task >> finish
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement