Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- Основные определения
- """
- import asyncio
- import types
- # платформенная сопрограмма
- async def coro():
- await asyncio.sleep(1)
- def gen():
- yield 11
- # Классическая сопрограмма
- def classic_coro():
- for _ in range(10):
- yield from gen()
- x = yield "YIELD"
- print(x)
- c = classic_coro()
- print(next(c))
- print(next(c))
- c.send(1)
- print(next(c))
- c.send(2)
- print(next(c))
- c.send(3)
- # Генераторные сопрограммы
- @types.coroutine
- def gen_coro():
- for _ in range(10):
- yield from asyncio.sleep(1)
- async def test():
- await gen_coro()
- # asyncio.run(test())
- """
- В asyncio есть генератор, отдающий переданные ему сопрограммы, в порядке готовности, а не в порядке подачи.
- Это выглядит как параллельное исполнение сопрограмм
- """
- import random
- async def test_coro(id_: int):
- await asyncio.sleep(random.randint(1, 5))
- return id_
- async def task(id_: int):
- await asyncio.sleep(5)
- return id_
- async def main_test():
- coros = []
- tasks = []
- for i in range(5):
- tasks.append(
- asyncio.create_task(task(i))
- )
- for i in range(5):
- coros.append(test_coro(i))
- for c in asyncio.as_completed(coros):
- print("ID: ", await c)
- # Таски выполняются в фоне, хоть и спят. 5 сек. Они спят асинхронно с test_coro корутинами.
- # Нам необязательно дожидаться, пока таска будет выполнена. Но если вы этого хотим, достаточно просто
- # вызвать таску с await если мы хотим передать ей управление в конкретной корутине
- for t in asyncio.as_completed(tasks):
- print("TASK ID: ", await t)
- # asyncio.run(main_test())
- """
- Если вы не можете переписать блокирующую функцию, как асинхронную. То её надо запустить в отдельном потоке или процессе.
- """
- """
- В asyncio есть возможность использовать семафоры и потоки.
- Семафор работает так: асинхронный контекстный менеджер семафора в asyncio не блокирует программу целиком.
- Он блокирует только конкретную сопрограмму, если счетчик семаформа обращается в ноль.
- Также, можно вынести отдельную функцию в поток, если нет возможности сделать её асинхронной.
- """
- import time
- def dont_coro(id_: int):
- time.sleep(2)
- return id_
- # Выполнение в отдельном потоке и асинхронно
- async def test_thread():
- thread_tasks = []
- for i in range(5):
- thread_tasks.append(asyncio.create_task(asyncio.to_thread(dont_coro, i)))
- print("test")
- for _, dc in zip(range(10), asyncio.as_completed(thread_tasks)):
- print("T ID: ", await dc)
- await asyncio.sleep(1)
- print("HELLO FROM test_thread")
- # asyncio.run(test_thread())
- """
- Рекомендуется помещать перед ASGI приложением прокси-сервер, который обрабатывает все статические файлы, а также по
- возможности, использовать CDN(система доставки контента)
- В состав FastAPI входят скрипты для генерации проектов
- """
- async def value():
- return random.randint(1, 10)
- # Асинхронные генераторы
- async def agen():
- for _ in range(5):
- yield await value()
- async def main():
- async for v in agen():
- print(v)
- # asyncio.run(main())
- """
- Чтобы сделать так, чтобы Асинхронные генераторы стали поддерживать контекстные менеджеры, надо применить к ним
- декоратор asynccontextmanager
- """
- # Пример асинхронного включения:
- async def acomp():
- print([x async for x in agen()])
- # asyncio.run(acomp())
- """
- Также можно определить асинхронное генераторное выражение (x async for x in agen()) где угодно в программе,
- но использовать его можно только в контексте асинхронного кода.
- """
- """
- Чтобы контролировать исключения в функции
- asyncio.gather()
- Надо задать аргумент return_exceptions при вызове как return_exceptions=True
- По умолчанию return_exceptions=False
- """
- """
- Если вручную создать таски в через create_task, то надо дождаться, что они закончатся явно.
- Например через await или as_complited
- """
- """
- У каждой задачи можно задать имя, это можно использовать как id задачи)))))))))))))))))
- Пример:
- """
- async def some_task(wait: int) -> int:
- print("some_task with wait: ", wait)
- await asyncio.sleep(wait)
- return 42
- async def some_main():
- tasks = [
- asyncio.create_task(some_task(num), name=f"number_{num}") for num in range(5)
- ]
- for task in tasks:
- print("...")
- await task
- if task.get_name() == "number_4":
- print("NUM 4 IS RUNNING!")
- task.cancel()
- print("NUM 4 IS CANCELED!")
- # asyncio.run(some_main())
- """
- Задачи можно отменить с помощью метода .cancel() с обычными корутинами так не сделать.
- """
- """
- Документация по Python 3.11+ рекомендует использовать TaskGroup() вместо gather()
- TaskGroup - это асинхронный контекстный менеджер. Все таски в нем создаются, но await'тя после уже выхода из контекста
- Пример:
- """
- async def task(id_: int):
- await asyncio.sleep(1)
- print(f"Hello from task with id: {id_}")
- if id_ % 2 == 0:
- raise Exception
- return id_ * 100
- # Можно выполнить хоть 500 задач, все равно они будут выполнены за ~1 секунду
- async def task_group():
- results = []
- try:
- async with asyncio.TaskGroup() as tasks:
- for i in range(500):
- results.append(
- tasks.create_task(
- task(i)
- )
- )
- print("inner context manager")
- except Exception as e:
- print(f"ERROR: {e}")
- """
- ПО КАКОЙ-ТО ПРИЧИНЕ, ИСКЛЮЧЕНИЕ ПРОБРАСЫВАЕТСЯ НЕ ТОЛЬКО В КОНТЕКСТНЫЙ МЕНЕДЖЕР, НО И В МЕТОД
- .result()
- """
- try:
- # Получение результатов от задач
- print("outer context manager", [t.result() for t in results])
- except:
- ...
- # asyncio.run(task_group())
- """
- Есть также возможость запустить задачи через wait.
- Интересный способ. По завершению wait вернет список выполненых задач и тех, которые были не выполнены.
- Например, из-за таймаута
- wait имеет параметр return_when у которого есть 3 состояния:
- * FIRST_COMPLETED - Вернуть done, pending когда выполнена, хотя бы одна задача
- * FIRST_EXCEPTION - вернуть done, pending когда было хотя бы одно исключение
- * ALL_COMPLETED(По умолчанию) - done, pending, когда выполнены все задачи
- Пример:
- """
- async def wait_task(wait_time: int):
- await asyncio.sleep(wait_time)
- async def wait_test():
- tasks = [
- asyncio.create_task(wait_task(1), name="d1"),
- asyncio.create_task(wait_task(2), name="d2"),
- asyncio.create_task(wait_task(3), name="d3"),
- ]
- done, pending = await asyncio.wait(tasks, timeout=2.1)
- print(
- f"{[d.get_name() for d in done]}, "
- f"{[p.get_name() for p in pending]}"
- )
- # asyncio.run(wait_test())
- """
- Для работы с файлами асинхронно можно юзать aiofiles это внешная библиотека
- """
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement