Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- TypeError: get_failed_dags_with_specified_tags() missing 2 required positional arguments: 'tags' and 'after_execution_date'
- import logging
- from datetime import datetime, timedelta
- from typing import List, Tuple
- from airflow import DAG
- from airflow.models import DagRun, DagTag, TaskInstance, Variable
- from airflow.operators.empty import EmptyOperator
- from airflow.operators.python import PythonOperator
- from airflow.utils.state import State
- from airflow.utils.session import create_session
- logger = logging.getLogger("airflow.task")
- def get_last_checked_time() -> datetime:
- """Получает последнее время проверки неудачных запусков DAG"""
- last_checked_time = Variable.get("last_check_time", None, deserialize_json=True)
- if last_checked_time:
- return datetime.fromisoformat(last_checked_time)
- else:
- return datetime.min
- def update_last_checked_time(execution_date: datetime):
- """Обновляет время последней проверки неудачных запусков DAG"""
- Variable.set("last_check_time", execution_date.isoformat(), serialize_json=True)
- logger.info("Время последней проверки успешно обновлено")
- def get_failed_dags_with_specified_tags(tags: List[str], after_execution_date: datetime) -> List[Tuple[DagRun, TaskInstance]]:
- """Получает список неудачных запусков DAG с указанными тегами"""
- with create_session() as session:
- failed_dag_runs = (
- session.query(DagRun)
- .filter(
- DagRun.state == State.FAILED,
- DagRun.execution_date >= after_execution_date,
- )
- .all()
- )
- failed_dags_with_tags = []
- for dag_run in failed_dag_runs:
- dag_tags = session.query(DagTag).filter(DagTag.dag_id == dag_run.dag_id).all()
- if set(tags).issubset({tag.name for tag in dag_tags}):
- failed_task = (
- session.query(TaskInstance)
- .filter(
- TaskInstance.dag_id == dag_run.dag_id,
- TaskInstance.execution_date == dag_run.execution_date,
- TaskInstance.state == State.FAILED,
- )
- .order_by(TaskInstance.end_date.desc())
- .first()
- )
- failed_dags_with_tags.append((dag_run, failed_task))
- return failed_dags_with_tags
- def log_failed_dags_info(failed_dags: List[Tuple[DagRun, TaskInstance]]):
- """Выводит информацию о неудачных запусках DAG в журнале Airflow"""
- logger.info(f"Количество DAG's со статусом failed: {len(failed_dags)}")
- for dag_run, failed_task in failed_dags:
- logger.info(
- f"DAG ID: {dag_run.dag_id}, Run ID: {dag_run.run_id}, Execution Date: {dag_run.execution_date}, "
- f"Task ID: {failed_task.task_id}, Status: {failed_task.state} Log URL: {failed_task.log_url}"
- )
- dag_params = {
- "dag_id": "routine_monitor_failed_dags",
- "description": "DAG, который отслеживает неудавшиеся запуски других DAG с указанными тегами и выводит информацию о них",
- "start_date": datetime(2023, 5, 10),
- "schedule_interval": timedelta(minutes=5),
- "catchup": False,
- "tags": ["test"],
- }
- with DAG(**dag_params) as dag: # type: ignore
- start = EmptyOperator(task_id="start")
- print_failed_dags_task = PythonOperator(
- task_id="print_failed_dags_with_specified_tags",
- python_callable=get_failed_dags_with_specified_tags,
- provide_context=True,
- params={"tags": ["test"]},
- dag=dag,
- )
- finish = EmptyOperator(task_id="finish")
- start >> print_failed_dags_task >> finish
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement