Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # Написать свой reverse через генератор
- # Условие задачи
- # - дан список из 1 миллиона URL, которые требуется опросить
- # - каждый запрос возвращает несколько значений item_id
- # - с полученными данными необходимо провести опрос трёх независимых сервисов
- # - из ответов этих сервисов, следует сформировать итоговое значение с использованием функции business_logic
- # - все результаты следует сохранить в итоговый список
- from typing import List
- # format: url, request_data
- # sample size: 10**6
- requests_samples = [
- ('http://some-service/getItems/', {'user_id': 100}), # вернет {'item_ids': [1, 2, 3]}
- ('http://some-service/getItems/', {'user_id': 101}),
- ...
- ]
- service_1_url = 'http://service1/fillItems/'
- service_2_url = 'http://service2/scoreItems/'
- service_3_url = 'http://service3/logItems/'
- def business_logic(service1_response, service2_response, service3_response) -> dict:
- # эта функция не делает сетевых вызовов, только обрабатывает ответы
- # считайте, что она уже написана
- return {}
- pool = multiproccessing.Pool(processes=os.processes.cpu_count() - 1)
- queue = asyncio.Queue()
- '''
- # схема
- 1. ('http://some-service/getItems/', {'user_id': 100}) -> # вернет item_ids: [1, 2, 3]
- 2. в любом порядке и независимо опрашиваем service_1-2-3_url с item_ids
- 3. собираем результат при помощи business_logic
- 4. вы великолепны! повторить миллион раз.
- '''
- -------------------------------------------------------------------------------------------------------------------------------
- import asyncio
- import aiohttp
- from typing import List, Any
- # ----------- 1. Собственный reverse через генератор -----------
- def my_reverse(iterable):
- """
- Генератор, возвращающий элементы iterable в обратном порядке.
- Пример:
- my_reverse([1, 2, 3]) -> 3, 2, 1
- """
- for i in range(len(iterable) - 1, -1, -1):
- yield iterable[i]
- # ----------- 2. Исходные данные / сервисы -----------
- requests_samples = [
- ('http://some-service/getItems/', {'user_id': 100}),
- ('http://some-service/getItems/', {'user_id': 101}),
- ('http://some-service/getItems/', {'user_id': 102}),
- # ... и т. д. до 10**6
- ]
- service_1_url = 'http://service1/fillItems/'
- service_2_url = 'http://service2/scoreItems/'
- service_3_url = 'http://service3/logItems/'
- def business_logic(service1_response, service2_response, service3_response) -> dict:
- """
- Псевдо-функция, которая комбинирует ответы трёх сервисов в единый результат.
- Считаем, что внутри никакой сети нет, просто обработка данных.
- """
- return {
- "final_data": (
- service1_response.get('someKey', 0)
- + service2_response.get('anotherKey', 0)
- + service3_response.get('thirdKey', 0)
- )
- }
- # ----------- 3. Помощная функция с retries + timeout -----------
- async def fetch_with_retries(
- session: aiohttp.ClientSession,
- url: str,
- json_data: dict,
- max_retries: int = 3,
- delay_on_retry: float = 1.0,
- request_timeout: float = 10.0 # добавили таймаут по умолчанию в 10 секунд
- ) -> dict:
- """
- Асинхронная функция для HTTP POST запроса с повторами.
- Если все попытки исчерпаны, выбрасывается последнее исключение.
- :param session: Экземпляр aiohttp.ClientSession
- :param url: Адрес, куда посылаем запрос
- :param json_data: Тело (JSON) для POST
- :param max_retries: Сколько раз будем пробовать при ошибках
- :param delay_on_retry: Задержка перед повтором (сек)
- :param request_timeout: Сколько ждать (сек) до таймаута
- :return: Результат, десериализованный из JSON
- """
- timeout = aiohttp.ClientTimeout(total=request_timeout)
- for attempt in range(1, max_retries + 1):
- try:
- async with session.post(url, json=json_data, timeout=timeout) as resp:
- resp.raise_for_status()
- return await resp.json()
- except (aiohttp.ClientError, asyncio.TimeoutError) as e:
- if attempt == max_retries:
- # Уже исчерпали все попытки, пробрасываем исключение дальше
- raise
- # Ждем перед повтором
- await asyncio.sleep(delay_on_retry)
- # ----------- 4. Воркер -----------
- async def worker(
- requests_queue: asyncio.Queue,
- session: aiohttp.ClientSession,
- results: list,
- semaphore: asyncio.Semaphore,
- request_timeout: float = 10.0
- ) -> None:
- """
- Воркер, который:
- 1. Берёт (url, data) из очереди.
- 2. Делает запрос к "some-service" -> получает item_ids.
- 3. Если item_ids не пуст, вызывает три независимых сервиса (c ограничением параллелизма).
- 4. Пропускает ответы через business_logic и добавляет результат в results.
- 5. Повторяет до тех пор, пока в очереди не появится "sentinel" (None).
- """
- while True:
- url_data = await requests_queue.get()
- if url_data is None:
- # sentinel для завершения
- requests_queue.task_done()
- break
- url, request_data = url_data
- try:
- # 1. Получаем item_ids (с таймаутом и повторами)
- get_items_resp = await fetch_with_retries(
- session,
- url,
- request_data,
- request_timeout=request_timeout
- )
- item_ids = get_items_resp.get("item_ids", [])
- if not item_ids:
- # Если пусто, просто записываем флаг в results
- results.append({"final_data": None, "error": "no_items"})
- else:
- # 2. Запрашиваем три сервиса параллельно, но под семафором
- async with semaphore:
- s1_task = asyncio.create_task(fetch_with_retries(
- session,
- service_1_url,
- {"item_ids": item_ids},
- request_timeout=request_timeout
- ))
- s2_task = asyncio.create_task(fetch_with_retries(
- session,
- service_2_url,
- {"item_ids": item_ids},
- request_timeout=request_timeout
- ))
- s3_task = asyncio.create_task(fetch_with_retries(
- session,
- service_3_url,
- {"item_ids": item_ids},
- request_timeout=request_timeout
- ))
- service1_response, service2_response, service3_response = await asyncio.gather(
- s1_task, s2_task, s3_task
- )
- # 3. Комбинируем ответы
- final_result = business_logic(
- service1_response,
- service2_response,
- service3_response
- )
- results.append(final_result)
- except Exception as e:
- # Логируем / обрабатываем ошибку, записываем, что пошло не так
- results.append({"final_data": None, "error": str(e)})
- finally:
- requests_queue.task_done()
- # ----------- 5. Основная корутина для запуска воркеров -----------
- async def main_concurrent_requests(
- requests_list: list,
- max_workers: int = 100,
- max_parallel_service_calls: int = 100,
- request_timeout: float = 10.0
- ) -> List[Any]:
- """
- Основная корутина:
- 1. Создаёт очередь и складывает туда все request'ы.
- 2. Запускает пул из N воркеров.
- 3. Собирает результаты в список results.
- 4. Дожидается завершения очереди и воркеров.
- """
- results = []
- requests_queue = asyncio.Queue()
- # Заполняем очередь
- for req in requests_list:
- requests_queue.put_nowait(req)
- # Ставим semaphore для ограничения параллельных вызовов к сервисам
- semaphore = asyncio.Semaphore(max_parallel_service_calls)
- async with aiohttp.ClientSession() as session:
- # Запускаем воркеров
- workers = [
- asyncio.create_task(
- worker(requests_queue, session, results, semaphore, request_timeout)
- )
- for _ in range(max_workers)
- ]
- # Ждём, пока все задания в очереди будут выполнены
- await requests_queue.join()
- # Добавляем sentinels, чтобы воркеры завершили цикл
- for _ in range(max_workers):
- await requests_queue.put(None)
- # Ждём завершения всех воркеров
- await asyncio.gather(*workers)
- return results
- # ----------- 6. Точка входа -----------
- def run_solution():
- """
- Точка входа для вызова решения.
- Здесь можно варьировать число воркеров, уровень параллелизма и т.д.
- """
- # Для примера возьмём лишь первые N запросов (или все 10**6 при реальном запуске).
- subset_requests = requests_samples # можно укоротить срезом: requests_samples[:1000]
- results = asyncio.run(
- main_concurrent_requests(
- requests_list=subset_requests,
- max_workers=50, # Количество асинхронных воркеров
- max_parallel_service_calls=100, # Ограничение параллельных запросов к сервисам
- request_timeout=10.0 # Таймаут (сек) на каждый запрос
- )
- )
- # Пример вывода первых нескольких результатов
- print("Sample of results:", results[:5])
- # ----------------- Запуск (при реальном использовании) -----------------
- if __name__ == "__main__":
- run_solution()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement