Advertisement
1nikitas

Untitled

May 25th, 2023
116
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.66 KB | None | 0 0
  1. from pathlib import Path
  2.  
  3. # noinspection PyPackageRequirements
  4. import pendulum
  5. import pytest
  6.  
  7. from dependencies.constants import DBEntity
  8. from dependencies.services.dataclasses import FileNameInfo
  9. from dependencies.services.file_info_parser import parse_file_info
  10.  
  11.  
  12. DATE_FORMAT = 'YYYY-MM-DDTHH-mm-ss.SSS'
  13.  
  14.  
  15. def parse_epoch_from_str(date_str: str) -> int:
  16. return int(
  17. pendulum.from_format(date_str, DATE_FORMAT).replace(tzinfo=pendulum.UTC).timestamp() * 1000
  18. )
  19.  
  20.  
  21. def test_new_format():
  22. assert parse_file_info(
  23. file_path=Path('cre-1-1674809573-xlxhx.json.zip'),
  24. entity=DBEntity.CREATIVE,
  25. ) == FileNameInfo(
  26. host_hash='xlxhx',
  27. epoch=1674809573000,
  28. version=1,
  29. entity_code='cre',
  30. )
  31.  
  32. assert parse_file_info(
  33. file_path=Path('cre-1-1674809573012-xlxhx.tmp'),
  34. entity=DBEntity.CREATIVE,
  35. ) == FileNameInfo(
  36. host_hash='xlxhx',
  37. epoch=1674809573012,
  38. version=1,
  39. entity_code='cre',
  40. )
  41.  
  42. assert parse_file_info(
  43. file_path=Path('con-2-1674809573123-xlxhx.tmp'),
  44. entity=DBEntity.CREATIVE,
  45. ) == FileNameInfo(
  46. host_hash='xlxhx',
  47. epoch=1674809573123,
  48. version=2,
  49. entity_code='con',
  50. )
  51.  
  52.  
  53. def test_old_format():
  54. assert parse_file_info(
  55. file_path=Path('highload-external-service-new-c648ccd7-xlxhx_2022-11-28T07-19-34.017_2022-11-28T07-34-35.993_000030010.json.zip'), # noqa E501
  56. entity=DBEntity.INVOICE,
  57. ) == FileNameInfo(
  58. host_hash='xlxhx',
  59. epoch=parse_epoch_from_str('2022-11-28T07-19-34.017'),
  60. version=1,
  61. entity_code='inv',
  62. )
  63.  
  64. assert parse_file_info(
  65. file_path=Path('highload-external-service-new-c648ccd7-xlxhx_2022-11-25T15-52-36.707Z_0007_2022-11-25T15-53-24.559813Z.zip'), # noqa E501
  66. entity=DBEntity.INVOICE_ENRICHMENT,
  67. ) == FileNameInfo(
  68. host_hash='xlxhx',
  69. epoch=parse_epoch_from_str('2022-11-25T15-52-36.707'),
  70. version=1,
  71. entity_code='inve',
  72. )
  73.  
  74. assert parse_file_info(
  75. file_path=Path('highload-external-service-new-8648f4dc69-tx6wv_2022-12-06T09-15-33.871.tmp'),
  76. entity=DBEntity.PLATFORM,
  77. ) == FileNameInfo(
  78. host_hash='tx6wv',
  79. epoch=parse_epoch_from_str('2022-12-06T09-15-33.871'),
  80. version=1,
  81. entity_code='pla',
  82. )
  83.  
  84. assert parse_file_info(
  85. file_path=Path('highload-external-service-new-8648f4dc69-tx6wv_2022-12-06T09-15-33.007597Z_0001.tmp'),
  86. entity=DBEntity.CONTRACT,
  87. ) == FileNameInfo(
  88. host_hash='tx6wv',
  89. epoch=parse_epoch_from_str('2022-12-06T09-15-33.007'),
  90. version=1,
  91. entity_code='con',
  92. )
  93.  
  94.  
  95. def test_unknown_format():
  96. with pytest.raises(AssertionError, match=r".* неизвестный формат имени"):
  97. parse_file_info(
  98. file_path=Path('wrong_file_name.tmp'),
  99. entity=DBEntity.CONTRACT,
  100. )
  101.  
  102. -------------------------------------------------------------
  103.  
  104. def get_ti_log_lines_with_specific_log_levels(
  105. log_levels: List[Union[int, str]],
  106. ti: TaskInstance,
  107. try_number: int,
  108. ) -> List[str]:
  109. """Получение и фильтрация логов из Airflow с заданными log_levels"""
  110. log_level_names = [logging.getLevelName(log_level) for log_level in log_levels]
  111.  
  112. task_logs_data_generator = TaskLogReader().read_log_stream(
  113. ti=ti,
  114. try_number=try_number,
  115. metadata={},
  116. )
  117.  
  118. filtered_log_lines = _parse_and_filtering_logs(
  119. logs_data_generator=task_logs_data_generator,
  120. log_level_names=log_level_names,
  121. )
  122.  
  123. return filtered_log_lines
  124.  
  125. ----------------------------------------------------------------
  126. def get_failed_tis_with_specific_tags(
  127. tags: List[str],
  128. log_levels: List[str],
  129. ) -> List[Dict[str, Any]]:
  130. """Получение информации о неуспешных TaskInstances и их логов с фильтрацией по log_levels"""
  131. context = get_current_context()
  132.  
  133. logger.info(f'Полученные тэги для фильтрации: {tags}')
  134.  
  135. begin_dttm, end_dttm = get_data_interval(context)
  136. logger.info(f'Период для мониторинга {begin_dttm=}, {end_dttm=}')
  137.  
  138. failed_tis_info = []
  139. for ti in _get_failed_tis_by_period_with_specific_tags(tags=tags, begin_dttm=begin_dttm, end_dttm=end_dttm):
  140. task_info = {
  141. 'template_data': {
  142. 'run_id': ti.run_id,
  143. 'dag_id': ti.dag_id,
  144. 'task_id': ti.task_id,
  145. 'status': ti.state,
  146. 'log_url': ti.log_url.replace('localhost', '127.0.0.1'),
  147. 'try_number': ti.prev_attempted_tries,
  148. },
  149. }
  150.  
  151. filtered_log_lines = get_ti_log_lines_with_specific_log_levels(
  152. log_levels=log_levels,
  153. ti=ti,
  154. try_number=ti.prev_attempted_tries,
  155. )
  156. if filtered_log_lines:
  157. task_info['attachment'] = {
  158. 'type': InputMediaType.DOCUMENT,
  159. 'name': LOG_FILE_NAME_TEMPLATE.format(
  160. dag_id=ti.dag_id,
  161. run_id=ti.run_id,
  162. task_id=ti.task_id,
  163. try_number=ti.prev_attempted_tries,
  164. ),
  165. 'content': '\n'.join(filtered_log_lines),
  166. }
  167.  
  168. failed_tis_info.append(task_info)
  169.  
  170. logger.info(f'Найдено {len(failed_tis_info)} задач, которые завершились с ошибкой')
  171.  
  172. return failed_tis_info
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement