Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from pathlib import Path
- # noinspection PyPackageRequirements
- import pendulum
- import pytest
- from dependencies.constants import DBEntity
- from dependencies.services.dataclasses import FileNameInfo
- from dependencies.services.file_info_parser import parse_file_info
- DATE_FORMAT = 'YYYY-MM-DDTHH-mm-ss.SSS'
- def parse_epoch_from_str(date_str: str) -> int:
- return int(
- pendulum.from_format(date_str, DATE_FORMAT).replace(tzinfo=pendulum.UTC).timestamp() * 1000
- )
- def test_new_format():
- assert parse_file_info(
- file_path=Path('cre-1-1674809573-xlxhx.json.zip'),
- entity=DBEntity.CREATIVE,
- ) == FileNameInfo(
- host_hash='xlxhx',
- epoch=1674809573000,
- version=1,
- entity_code='cre',
- )
- assert parse_file_info(
- file_path=Path('cre-1-1674809573012-xlxhx.tmp'),
- entity=DBEntity.CREATIVE,
- ) == FileNameInfo(
- host_hash='xlxhx',
- epoch=1674809573012,
- version=1,
- entity_code='cre',
- )
- assert parse_file_info(
- file_path=Path('con-2-1674809573123-xlxhx.tmp'),
- entity=DBEntity.CREATIVE,
- ) == FileNameInfo(
- host_hash='xlxhx',
- epoch=1674809573123,
- version=2,
- entity_code='con',
- )
- def test_old_format():
- assert parse_file_info(
- file_path=Path('highload-external-service-new-c648ccd7-xlxhx_2022-11-28T07-19-34.017_2022-11-28T07-34-35.993_000030010.json.zip'), # noqa E501
- entity=DBEntity.INVOICE,
- ) == FileNameInfo(
- host_hash='xlxhx',
- epoch=parse_epoch_from_str('2022-11-28T07-19-34.017'),
- version=1,
- entity_code='inv',
- )
- assert parse_file_info(
- file_path=Path('highload-external-service-new-c648ccd7-xlxhx_2022-11-25T15-52-36.707Z_0007_2022-11-25T15-53-24.559813Z.zip'), # noqa E501
- entity=DBEntity.INVOICE_ENRICHMENT,
- ) == FileNameInfo(
- host_hash='xlxhx',
- epoch=parse_epoch_from_str('2022-11-25T15-52-36.707'),
- version=1,
- entity_code='inve',
- )
- assert parse_file_info(
- file_path=Path('highload-external-service-new-8648f4dc69-tx6wv_2022-12-06T09-15-33.871.tmp'),
- entity=DBEntity.PLATFORM,
- ) == FileNameInfo(
- host_hash='tx6wv',
- epoch=parse_epoch_from_str('2022-12-06T09-15-33.871'),
- version=1,
- entity_code='pla',
- )
- assert parse_file_info(
- file_path=Path('highload-external-service-new-8648f4dc69-tx6wv_2022-12-06T09-15-33.007597Z_0001.tmp'),
- entity=DBEntity.CONTRACT,
- ) == FileNameInfo(
- host_hash='tx6wv',
- epoch=parse_epoch_from_str('2022-12-06T09-15-33.007'),
- version=1,
- entity_code='con',
- )
- def test_unknown_format():
- with pytest.raises(AssertionError, match=r".* неизвестный формат имени"):
- parse_file_info(
- file_path=Path('wrong_file_name.tmp'),
- entity=DBEntity.CONTRACT,
- )
- -------------------------------------------------------------
- def get_ti_log_lines_with_specific_log_levels(
- log_levels: List[Union[int, str]],
- ti: TaskInstance,
- try_number: int,
- ) -> List[str]:
- """Получение и фильтрация логов из Airflow с заданными log_levels"""
- log_level_names = [logging.getLevelName(log_level) for log_level in log_levels]
- task_logs_data_generator = TaskLogReader().read_log_stream(
- ti=ti,
- try_number=try_number,
- metadata={},
- )
- filtered_log_lines = _parse_and_filtering_logs(
- logs_data_generator=task_logs_data_generator,
- log_level_names=log_level_names,
- )
- return filtered_log_lines
- ----------------------------------------------------------------
- def get_failed_tis_with_specific_tags(
- tags: List[str],
- log_levels: List[str],
- ) -> List[Dict[str, Any]]:
- """Получение информации о неуспешных TaskInstances и их логов с фильтрацией по log_levels"""
- context = get_current_context()
- logger.info(f'Полученные тэги для фильтрации: {tags}')
- begin_dttm, end_dttm = get_data_interval(context)
- logger.info(f'Период для мониторинга {begin_dttm=}, {end_dttm=}')
- failed_tis_info = []
- for ti in _get_failed_tis_by_period_with_specific_tags(tags=tags, begin_dttm=begin_dttm, end_dttm=end_dttm):
- task_info = {
- 'template_data': {
- 'run_id': ti.run_id,
- 'dag_id': ti.dag_id,
- 'task_id': ti.task_id,
- 'status': ti.state,
- 'log_url': ti.log_url.replace('localhost', '127.0.0.1'),
- 'try_number': ti.prev_attempted_tries,
- },
- }
- filtered_log_lines = get_ti_log_lines_with_specific_log_levels(
- log_levels=log_levels,
- ti=ti,
- try_number=ti.prev_attempted_tries,
- )
- if filtered_log_lines:
- task_info['attachment'] = {
- 'type': InputMediaType.DOCUMENT,
- 'name': LOG_FILE_NAME_TEMPLATE.format(
- dag_id=ti.dag_id,
- run_id=ti.run_id,
- task_id=ti.task_id,
- try_number=ti.prev_attempted_tries,
- ),
- 'content': '\n'.join(filtered_log_lines),
- }
- failed_tis_info.append(task_info)
- logger.info(f'Найдено {len(failed_tis_info)} задач, которые завершились с ошибкой')
- return failed_tis_info
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement