| Date: | 2020-11-10 09:00 |
|---|---|
| summary: | Работа с асинхронными операциями. |
| Status: | published |
Contents
В программировании эти два понятия достаточно близки друг к другу, однако в то же время имеют существенную разницу.
- Parallel execution (параллелизм) — исполнение нескольких задач одновременно. Для достижения параллелизма необходимо физическое одноврменное исполнение задач (multithreading и multiprocessing).
- Concurrency (конкурентность) — две или более задачи могут запускаться, выполняться и завершаться в перекрывающиеся периоды времени.
Основным отличием конкурентности является то, что это логическое распараллеливание программы. Если в вашем ПК только одно ядро, то в один момент времени может исполняться только одна задача. Однако в случае конкурентности исполенение задач может приостанавливаться с целью переключения на другие задачи. Тем самым прогресс исполнения нескольких задач может идти одновременно по нескольким задачам даже в случае физически невозможного их одновременного исполнения.
Как упоминалось в предыдущей теме, мы можем хотим продолжать вычисления параллельно с операциями ввода/вывода. Такие операции достаточно медленные, и их вызов блокирует поток исполнения программы. Поэтому эти операции выносили в отдельный поток.
Асинхронные операции ввода/вывода позволяют программе продолжать выполнение, не дожидаясь результата их исполнения. Как только такая операция завершается, вызывается исполнение функции обратного вызова (callback). Это написанная вами функция, в которой можно реализовать необходимую логику.
# функция, отвечающая за обработку ответа
def handle_response(response):
print('\n{:.70}...'.format(response.body))
# создание объекта для общения с сетью
http_client = AsyncHTTPClient()
# неблокирующий вызов функции!
# после вызова функции fetch будет выполняться следующий за этой строчкой код без ожидания получения ответа
# ответ с сайта будет обработан функцией handle_response (так называемым callback'ом)
http_client.fetch('http://yandex.ru', callback=handle_response)Проблема данного подхода заключается в том, что внутри одной callback функции может быть вызвана другая и т.д. Такой код становится трудно читаем, а стек вызова становится достаточно запутанным.
Корутина (coroutine) - подпрограмма (функция), которая может начинаться, приостанавливаться и завершаться в произвольный момент времени. Корутины описываются синтаксисом async/await.
Сходу рассмотрим несколько примеров.
import asyncio
async def main():
print('hello')
await asyncio.sleep(1)
print('world')
loop = asyncio.get_event_loop()
# Blocking call which returns when the main() coroutine is done
loop.run_until_complete(main())
loop.close()В этом примере:
- Управление будет передано в корутину main;
- Напечатается hello;
- Управление будет передано в корутину sleep;
- В течении одной секунды программа "спит";
- Управление вернется в main;
- Напечатается wolrd;
- Корутина main, а далее сама программа завершаются.
Следующий пример напечатает “hello” после ожидания в 1 секунду, а затем напечатает “world” после ожидания в 2 секунды:
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
print(f"started at {time.strftime('%X')}")
await say_after(1, 'hello')
await say_after(2, 'world')
print(f"finished at {time.strftime('%X')}")
loop = asyncio.get_event_loop()
# Blocking call which returns when the main() coroutine is done
loop.run_until_complete(main())
loop.close()В следующем примере мы запустим две задачи на параллельное исполнение.
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
loop = asyncio.get_event_loop()
# Ожидание завершения обоих операций должно занять около 2х секунд
print(f"started at {time.strftime('%X')}")
loop.run_until_complete(asyncio.gather(
say_after(1, 'hello'),
say_after(2, 'world')
))
print(f"finished at {time.strftime('%X')}")
loop.close()При работе с асинхронностью мы встретились с понятием цикл событий (event loop). Это программная конструкция, которая управляет выполнением различных задач: регистрирует поступление и запускает в подходящий момент.
C помощью синтаксиса await мы определяем места, где можно переключиться на другие ожидающие выполнения задачи.
Рассмотрим подробнее следующий пример:
import asyncio
async def compute(a, b):
print('Compute...')
await asyncio.sleep(1.0)
return a + b
async def print_sum(a, b):
result = await compute(a, b)
print('{} + {} = {}'.format(a, b, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()Начиная с версии Python 3.7 синтаксис работы с библиотекой (а именно создание цикла событий) был упрощен. Подробнее про библиотеку можно узнать здесь.
Что будет напечатано и почему?
import asyncio
async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print(f"Task {name}: Compute factorial({i})...")
await asyncio.sleep(1)
f *= i
print(f"Task {name}: factorial({number}) = {f}")
async def main():
await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()Иногда выполнение операции может занять очень длительное время. Например, вы до сих пор не получили ответ от сервера. В случае отсутствия соединения ваша операция может висеть бесконечно долго. В таком случае на асинхронные операции имеет смысл выставлять timeout. Пример на выставление timeout:
import asyncio
async def eternity():
# Sleep for one hour
await asyncio.sleep(3600)
print('yay!')
async def main():
# Wait for at most 1 second
try:
await asyncio.wait_for(eternity(), timeout=1.0)
except asyncio.TimeoutError:
print('timeout!')
loop = asyncio.get_event_loop()
loop.run_until_complete(main())Асинхронный контекстный менеджер - это контекстный менджер, который умеет приостанавливать выполнение в методах входа и выхода: __aenter__(), __aexit__()
lock = asyncio.Lock()
# ... later
await lock.acquire()
try:
# access shared state
finally:
lock.release()lock = asyncio.Lock()
# ... later
async with lock:
# access shared stateФутуры (futures) — объекты, в которых хранится текущий результат выполнения какой-либо задачи. Это может быть информация о том, что задача ещё не обработана или уже полученный результат; а может быть вообще исключение.
Одна из особенностей футур, что мы можем запустить задачу на исполнение в одной корутине, а получить результат выполнения в другой. У футур есть 4 возможных состояния: + ожидание (pending) + выполнение (running) + выполнено (done) + отменено (cancelled)
Когда футура находится в состояние done, у неё можно получить результат выполнения. В состояниях pending и running такая операция приведёт к исключению InvalidStateError, а в случае canelled будет CancelledError, и наконец, если исключение произошло в самой корутине, оно будет сгенерировано снова при попытке получить результат.
Узнать состояние футуры можно с помощью методов done() или cancelled(), Вызов result() возвращает ожидаемый результат. Для получения исключения есть метод exception(). Для отмены выполнения футуры есть метод cancel(). И result() и exception() выбросят CancelledError, если футура была остановлена в процессе работы.
Ожидание окончания футуры можно сделать при помощи функции wait_for(). Первый аргумент - футура, второй - таймаут (None, если таймаут не нужен).
import asyncio
async def set_after(delay, value):
# Sleep for *delay* seconds.
await asyncio.sleep(delay)
# Set *value* as a result of *fut* Future.
return value
async def main():
# Run "set_after()" coroutine in a parallel Task.
fut = asyncio.ensure_future(
set_after(1, '... world'))
print('hello ...')
# Wait until *fut* has a result (1 second)
await asyncio.wait_for(fut, None)
# and print it.
print(fut.result())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()Возможен запуск футуры при помощи await.
async def main():
# Run "set_after()" coroutine in a parallel Task.
fut = asyncio.ensure_future(
set_after(1, '... world'))
print('hello ...')
# Wait until *fut* has a result (1 second) and print it.
# Alternative way to get a result, just use it.
print(await fut)Рядом с asyncio создано огромное количество асинхронных модулей для решения всевозможных задач. aiohttp - лишь одна из них. Это асинхронный HTTP Клиент/Сервер
В следующем примере получаем содержимое страницы google.com: (при отсутствии доступа в интернет, cs.mipt.ru)
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get('http://google.com') as resp:
text = await resp.text()
print('{:.70}...'.format(text))Реализация простого сервера:
from aiohttp import web
async def handle(request):
name = request.match_info.get('name', 'Anonymous')
text = 'Hello, ' + name
# ...
# здесь идет некоторая дополнительная логика с async/await
#
return web.Response(text=text)
app = web.Application()
app.add_routes([web.get('/', handle),
web.get('/{name}', handle)])
web.run_app(app)Узнать свой IP адрес. Есть куча сервисов, которые позволяют узнать ваш ip. Но на момент запуска программы вы не знаете какой из сервисов доступен. Вместо того, чтобы опрашивать каждый из этих сервисов последовательно, можно запустить все запросы конкурентно и выбрать первый успешный.
При отсутствии доступа в интернет симулируйте задачу через cs.mipt.ru (к примеру, получение страниц вида cs.mipt.ru/advanced_python/lessons/labX.html и выбора первой, в которой количество символов больше, чем N)
Потребуется asyncio.wait() и параметр return_when
from collections import namedtuple
import time
import asyncio
from concurrent.futures import FIRST_COMPLETED
import aiohttp
Service = namedtuple('Service', ('name', 'url', 'ip_attr'))
SERVICES = (
Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
Service('ip-api', 'http://ip-api.com/json', 'query')
)
async def fetch_ip(service):
# получение ip
async def asynchronous():
# TODO:
# создание футур для сервисов
# используйте FIRST_COMPLETED
ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())Это библиотека для написания асинхронного Telegram бота.
Напишите телеграм бота, который будет на сообщение присылать соответствующее изображение
- установить aiogram 1.4 - асинхронная обертка над api телеграмма
- поговорить с @FatherBot, создать бота и запомнить выданный токен
- В рф нужно использовать впн или прокси (в сети есть огромное количество списков адресов)
- разобраться с примером эхо бота ниже
- написать требуемый функционал (картинки можно запрашивать через поиск яндекса или гугла, существуют готовые api, можно написать и самостоятельно)
from aiogram import Bot, types
from aiogram.dispatcher import Dispatcher
from aiogram.utils import executor
PROXY_URL = 'socks5://xxx.xxx.xxx.xxx' # вставить здесь подходящий ip
secret_token = 'XXX' # строка вида: 123456789:ABCDEFGHJABCDEFGHJABCDEFGHJABCDEFGHJ
bot = Bot(token=secret_token, proxy=PROXY_URL)
dp = Dispatcher(bot)
@dp.message_handler(commands=['start', 'help'])
async def send_welcome(message: types.Message):
await message.reply("Hi!\nI'm EchoBot!\nPowered by aiogram.")
@dp.message_handler()
async def echo(message: types.Message):
await message.reply(message.text)
if __name__ == '__main__':
executor.start_polling(dp)