Advertisement
LilChicha174

Untitled

Jan 19th, 2025
62
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 11.60 KB | None | 0 0
  1.  
  2. # Написать свой reverse через генератор
  3.  
  4. # Условие задачи
  5.  
  6. # - дан список из 1 миллиона URL, которые требуется опросить
  7.  
  8. # - каждый запрос возвращает несколько значений item_id
  9.  
  10. # - с полученными данными необходимо провести опрос трёх независимых сервисов
  11.  
  12. # - из ответов этих сервисов, следует сформировать итоговое значение с использованием функции business_logic
  13.  
  14. # - все результаты следует сохранить в итоговый список
  15.  
  16.  
  17.  
  18.  
  19. from typing import List
  20.  
  21.  
  22.  
  23. # format: url, request_data
  24.  
  25. # sample size: 10**6
  26.  
  27. requests_samples = [
  28.  
  29. ('http://some-service/getItems/', {'user_id': 100}), # вернет {'item_ids': [1, 2, 3]}
  30.  
  31. ('http://some-service/getItems/', {'user_id': 101}),
  32.  
  33. ...
  34.  
  35. ]
  36.  
  37.  
  38.  
  39. service_1_url = 'http://service1/fillItems/'
  40.  
  41. service_2_url = 'http://service2/scoreItems/'
  42.  
  43. service_3_url = 'http://service3/logItems/'
  44.  
  45.  
  46.  
  47. def business_logic(service1_response, service2_response, service3_response) -> dict:
  48.  
  49. # эта функция не делает сетевых вызовов, только обрабатывает ответы
  50.  
  51. # считайте, что она уже написана
  52.  
  53. return {}
  54.  
  55.  
  56.  
  57. pool = multiproccessing.Pool(processes=os.processes.cpu_count() - 1)
  58.  
  59. queue = asyncio.Queue()
  60.  
  61.  
  62.  
  63. '''
  64.  
  65. # схема
  66.  
  67. 1. ('http://some-service/getItems/', {'user_id': 100}) -> # вернет item_ids: [1, 2, 3]
  68.  
  69. 2. в любом порядке и независимо опрашиваем service_1-2-3_url с item_ids
  70.  
  71. 3. собираем результат при помощи business_logic
  72.  
  73. 4. вы великолепны! повторить миллион раз.
  74. '''
  75.  
  76.  
  77.  
  78.  
  79.  
  80. -------------------------------------------------------------------------------------------------------------------------------
  81.  
  82.  
  83.  
  84.  
  85.  
  86.  
  87.  
  88.  
  89.  
  90.  
  91.  
  92. import asyncio
  93. import aiohttp
  94. from typing import List, Any
  95.  
  96.  
  97. # ----------- 1. Собственный reverse через генератор -----------
  98. def my_reverse(iterable):
  99.     """
  100.    Генератор, возвращающий элементы iterable в обратном порядке.
  101.    Пример:
  102.        my_reverse([1, 2, 3]) -> 3, 2, 1
  103.    """
  104.     for i in range(len(iterable) - 1, -1, -1):
  105.         yield iterable[i]
  106.  
  107.  
  108. # ----------- 2. Исходные данные / сервисы -----------
  109. requests_samples = [
  110.     ('http://some-service/getItems/', {'user_id': 100}),
  111.     ('http://some-service/getItems/', {'user_id': 101}),
  112.     ('http://some-service/getItems/', {'user_id': 102}),
  113.     # ... и т. д. до 10**6
  114. ]
  115.  
  116. service_1_url = 'http://service1/fillItems/'
  117. service_2_url = 'http://service2/scoreItems/'
  118. service_3_url = 'http://service3/logItems/'
  119.  
  120.  
  121. def business_logic(service1_response, service2_response, service3_response) -> dict:
  122.     """
  123.    Псевдо-функция, которая комбинирует ответы трёх сервисов в единый результат.
  124.    Считаем, что внутри никакой сети нет, просто обработка данных.
  125.    """
  126.     return {
  127.         "final_data": (
  128.             service1_response.get('someKey', 0)
  129.             + service2_response.get('anotherKey', 0)
  130.             + service3_response.get('thirdKey', 0)
  131.         )
  132.     }
  133.  
  134.  
  135. # ----------- 3. Помощная функция с retries + timeout -----------
  136. async def fetch_with_retries(
  137.     session: aiohttp.ClientSession,
  138.     url: str,
  139.     json_data: dict,
  140.     max_retries: int = 3,
  141.     delay_on_retry: float = 1.0,
  142.     request_timeout: float = 10.0  # добавили таймаут по умолчанию в 10 секунд
  143. ) -> dict:
  144.     """
  145.    Асинхронная функция для HTTP POST запроса с повторами.
  146.    Если все попытки исчерпаны, выбрасывается последнее исключение.
  147.  
  148.    :param session: Экземпляр aiohttp.ClientSession
  149.    :param url: Адрес, куда посылаем запрос
  150.    :param json_data: Тело (JSON) для POST
  151.    :param max_retries: Сколько раз будем пробовать при ошибках
  152.    :param delay_on_retry: Задержка перед повтором (сек)
  153.    :param request_timeout: Сколько ждать (сек) до таймаута
  154.    :return: Результат, десериализованный из JSON
  155.    """
  156.     timeout = aiohttp.ClientTimeout(total=request_timeout)
  157.     for attempt in range(1, max_retries + 1):
  158.         try:
  159.             async with session.post(url, json=json_data, timeout=timeout) as resp:
  160.                 resp.raise_for_status()
  161.                 return await resp.json()
  162.         except (aiohttp.ClientError, asyncio.TimeoutError) as e:
  163.             if attempt == max_retries:
  164.                 # Уже исчерпали все попытки, пробрасываем исключение дальше
  165.                 raise
  166.             # Ждем перед повтором
  167.             await asyncio.sleep(delay_on_retry)
  168.  
  169.  
  170. # ----------- 4. Воркер -----------
  171. async def worker(
  172.     requests_queue: asyncio.Queue,
  173.     session: aiohttp.ClientSession,
  174.     results: list,
  175.     semaphore: asyncio.Semaphore,
  176.     request_timeout: float = 10.0
  177. ) -> None:
  178.     """
  179.    Воркер, который:
  180.    1. Берёт (url, data) из очереди.
  181.    2. Делает запрос к "some-service" -> получает item_ids.
  182.    3. Если item_ids не пуст, вызывает три независимых сервиса (c ограничением параллелизма).
  183.    4. Пропускает ответы через business_logic и добавляет результат в results.
  184.    5. Повторяет до тех пор, пока в очереди не появится "sentinel" (None).
  185.    """
  186.     while True:
  187.         url_data = await requests_queue.get()
  188.         if url_data is None:
  189.             # sentinel для завершения
  190.             requests_queue.task_done()
  191.             break
  192.  
  193.         url, request_data = url_data
  194.         try:
  195.             # 1. Получаем item_ids (с таймаутом и повторами)
  196.             get_items_resp = await fetch_with_retries(
  197.                 session,
  198.                 url,
  199.                 request_data,
  200.                 request_timeout=request_timeout
  201.             )
  202.             item_ids = get_items_resp.get("item_ids", [])
  203.             if not item_ids:
  204.                 # Если пусто, просто записываем флаг в results
  205.                 results.append({"final_data": None, "error": "no_items"})
  206.             else:
  207.                 # 2. Запрашиваем три сервиса параллельно, но под семафором
  208.                 async with semaphore:
  209.                     s1_task = asyncio.create_task(fetch_with_retries(
  210.                         session,
  211.                         service_1_url,
  212.                         {"item_ids": item_ids},
  213.                         request_timeout=request_timeout
  214.                     ))
  215.                     s2_task = asyncio.create_task(fetch_with_retries(
  216.                         session,
  217.                         service_2_url,
  218.                         {"item_ids": item_ids},
  219.                         request_timeout=request_timeout
  220.                     ))
  221.                     s3_task = asyncio.create_task(fetch_with_retries(
  222.                         session,
  223.                         service_3_url,
  224.                         {"item_ids": item_ids},
  225.                         request_timeout=request_timeout
  226.                     ))
  227.  
  228.                     service1_response, service2_response, service3_response = await asyncio.gather(
  229.                         s1_task, s2_task, s3_task
  230.                     )
  231.  
  232.                 # 3. Комбинируем ответы
  233.                 final_result = business_logic(
  234.                     service1_response,
  235.                     service2_response,
  236.                     service3_response
  237.                 )
  238.                 results.append(final_result)
  239.  
  240.         except Exception as e:
  241.             # Логируем / обрабатываем ошибку, записываем, что пошло не так
  242.             results.append({"final_data": None, "error": str(e)})
  243.         finally:
  244.             requests_queue.task_done()
  245.  
  246.  
  247. # ----------- 5. Основная корутина для запуска воркеров -----------
  248. async def main_concurrent_requests(
  249.     requests_list: list,
  250.     max_workers: int = 100,
  251.     max_parallel_service_calls: int = 100,
  252.     request_timeout: float = 10.0
  253. ) -> List[Any]:
  254.     """
  255.    Основная корутина:
  256.    1. Создаёт очередь и складывает туда все request'ы.
  257.    2. Запускает пул из N воркеров.
  258.    3. Собирает результаты в список results.
  259.    4. Дожидается завершения очереди и воркеров.
  260.    """
  261.     results = []
  262.     requests_queue = asyncio.Queue()
  263.  
  264.     # Заполняем очередь
  265.     for req in requests_list:
  266.         requests_queue.put_nowait(req)
  267.  
  268.     # Ставим semaphore для ограничения параллельных вызовов к сервисам
  269.     semaphore = asyncio.Semaphore(max_parallel_service_calls)
  270.  
  271.     async with aiohttp.ClientSession() as session:
  272.         # Запускаем воркеров
  273.         workers = [
  274.             asyncio.create_task(
  275.                 worker(requests_queue, session, results, semaphore, request_timeout)
  276.             )
  277.             for _ in range(max_workers)
  278.         ]
  279.  
  280.         # Ждём, пока все задания в очереди будут выполнены
  281.         await requests_queue.join()
  282.  
  283.         # Добавляем sentinels, чтобы воркеры завершили цикл
  284.         for _ in range(max_workers):
  285.             await requests_queue.put(None)
  286.  
  287.         # Ждём завершения всех воркеров
  288.         await asyncio.gather(*workers)
  289.  
  290.     return results
  291.  
  292.  
  293. # ----------- 6. Точка входа -----------
  294. def run_solution():
  295.     """
  296.    Точка входа для вызова решения.
  297.    Здесь можно варьировать число воркеров, уровень параллелизма и т.д.
  298.    """
  299.     # Для примера возьмём лишь первые N запросов (или все 10**6 при реальном запуске).
  300.     subset_requests = requests_samples  # можно укоротить срезом: requests_samples[:1000]
  301.  
  302.     results = asyncio.run(
  303.         main_concurrent_requests(
  304.             requests_list=subset_requests,
  305.             max_workers=50,               # Количество асинхронных воркеров
  306.             max_parallel_service_calls=100,  # Ограничение параллельных запросов к сервисам
  307.             request_timeout=10.0            # Таймаут (сек) на каждый запрос
  308.         )
  309.     )
  310.  
  311.     # Пример вывода первых нескольких результатов
  312.     print("Sample of results:", results[:5])
  313.  
  314.  
  315. # ----------------- Запуск (при реальном использовании) -----------------
  316. if __name__ == "__main__":
  317.     run_solution()
  318.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement