От автора: в этом руководстве вы познакомитесь с функциями асинхронного ввода-вывода, представленными в Python 3.4 и улучшенными в Python 3.5 и 3.6.
Ранее в Python было мало вариантов для асинхронного программирования. Новая функция асинхронного ввода-вывода, наконец, обеспечивает необходимую поддержку, которая включает в себя как высокоуровневые API, так и стандартную поддержку, нацеленную на объединение нескольких сторонних решений (Twisted, Gevent, Tornado, asyncore и т.д.).
Важно понимать, что изучение асинхронного ввода-вывода Python не является простой задачей из-за скоростной итерации, области действия и необходимости обеспечить способ миграции на существующие асинхронные среды. Я сосредоточусь на последних и самых простых аспектах.
Существует множество вещей, которые интересным образом взаимодействуют через границы потоков, границы процессов и удаленные машины. Существуют отличия и ограничения для конкретной платформы. Давайте разберемся в этом.
Подключаемый цикл событий
Основной концепцией асинхронного ввода-вывода является цикл обработки событий. В программе может быть несколько циклов событий. Каждый поток будет иметь не более одного активного цикла обработки событий. Цикл обработки событий предоставляет следующие возможности:
Регистрация, выполнение и отмена отложенных вызовов (с задержками).
Создание клиентских и серверных транспортов для различных видов связи.
Запуск подпроцессов и связанных транспортов для связи с внешней программой.
Делегирование ресурсозатратных вызовов функций в пул потоков.
Вот простой пример, в котором запускаются две сопрограммы и вызывается функция с задержкой. Он показывает, как использовать цикл обработки событий для работы вашей программы:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
import asyncio async def foo(delay): for i in range(10): print(i) await asyncio.sleep(delay) def stopper(loop): loop.stop() loop = asyncio.get_event_loop() # Schedule a call to foo() loop.create_task(foo(0.5)) loop.create_task(foo(1)) loop.call_later(12, stopper, loop) # Block until loop.stop() is called() loop.run_forever() loop.close() |
Класс AbstractEventLoop обеспечивает базовое соглашение для циклов событий. Есть много вещей, которые должен поддерживать цикл обработки событий:
Планирование функций и сопрограмм для выполнения
Создание фьючерсов и задач
Управление TCP-серверами
Обработка сигналов (в Unix)
Работа с пайпами и подпроцессами
Вот методы, связанные с запуском и остановкой события, а также планированием функций и сопрограмм:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
class AbstractEventLoop: """Abstract event loop.""" # Running and stopping the event loop. def run_forever(self): """Run the event loop until stop() is called.""" raise NotImplementedError def run_until_complete(self, future): """Run the event loop until a Future is done. Return the Future's result, or raise its exception. """ raise NotImplementedError def stop(self): """Stop the event loop as soon as reasonable. Exactly how soon that is may depend on the implementation, but no more I/O callbacks should be scheduled. """ raise NotImplementedError def is_running(self): """Return whether the event loop is currently running.""" raise NotImplementedError def is_closed(self): """Returns True if the event loop was closed.""" raise NotImplementedError def close(self): """Close the loop. The loop should not be running. This is idempotent and irreversible. No other methods should be called after this one. """ raise NotImplementedError def shutdown_asyncgens(self): """Shutdown all active asynchronous generators.""" raise NotImplementedError # Methods scheduling callbacks. All these return Handles. def _timer_handle_cancelled(self, handle): """Notification that a TimerHandle has been cancelled.""" raise NotImplementedError def call_soon(self, callback, *args): return self.call_later(0, callback, *args) def call_later(self, delay, callback, *args): raise NotImplementedError def call_at(self, when, callback, *args): raise NotImplementedError def time(self): raise NotImplementedError def create_future(self): raise NotImplementedError # Method scheduling a coroutine object: create a task. def create_task(self, coro): raise NotImplementedError # Methods for interacting with threads. def call_soon_threadsafe(self, callback, *args): raise NotImplementedError def run_in_executor(self, executor, func, *args): raise NotImplementedError def set_default_executor(self, executor): raise NotImplementedError |
Подключение нового цикла событий
Asyncio предназначен для поддержки нескольких реализаций циклов событий, которые придерживаются его API. Ключ — это класс EventLoopPolicy, который настраивает asyncio и позволяет контролировать каждый аспект цикла событий. Вот пример пользовательского цикла обработки событий uvloop , основанного на libuv, который должен быть намного быстрее, чем его альтернативы (я не тестировал его сам):
1 2 3 |
import asyncio import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) |
Вот и все. Теперь, когда вы используете любую функцию asyncio, она запускает uvloop под капотом.
Сопрограммы, фьючерсы и задачи
Сопрограмма — это составной термин. Это функция, которая выполняется асинхронно, и объект, который необходимо запланировать. Вы определяете их, добавляя ключевое слово async перед определением:
1 2 3 4 5 |
import asyncio async def cool_coroutine(): return "So cool..." |
Если вы вызовите такую функцию, она не запустится. Вместо этого она возвращает объект сопрограммы, и если вы не запланируете его выполнение, вы также получите предупреждение:
1 2 3 4 5 6 7 8 9 |
c = cool_coroutine() print(c) Output: <coroutine object cool_coroutine at 0x108a862b0> sys:1: RuntimeWarning: coroutine 'cool_coroutine' was never awaited Process finished with exit code 0 |
Чтобы на самом деле выполнить сопрограмму, нам нужен цикл обработки событий:
1 2 3 4 5 6 7 8 |
r = loop.run_until_complete(c) loop.close() print(r) Output: So cool... |
Это прямое планирование. Вы также можете поставить в очередь сопрограммы. Обратите внимание, что при вызове сопрограмм вы должны вызывать await:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
import asyncio async def compute(x, y): print("Compute %s + %s ..." % (x, y)) await asyncio.sleep(1.0) return x + y async def print_sum(x, y): result = await compute(x, y) print("%s + %s = %s" % (x, y, result)) loop = asyncio.get_event_loop() loop.run_until_complete(print_sum(1, 2)) loop.close() |
Класс asyncio Future аналогичен классу concurrent.future.Future. Он не является потокобезопасным и поддерживает следующие функции:
добавление и удаление готовых обратных вызовов
отмена
настройка результатов и исключений
Вот как использовать фьючерсы с циклом событий. Сопрограмма take_your_time() принимает фьючерс и устанавливает свой результат после простоя в течение секунды.
Функций ensure_future() планирует сопрограмму, а wait_until_complete() ожидает выполнения фьючерса. Под капотом она добавляет готовый обратный вызов к фьючерсу.
1 2 3 4 5 6 7 8 9 10 11 12 |
import asyncio async def take_your_time(future): await asyncio.sleep(1) future.set_result(42) loop = asyncio.get_event_loop() future = asyncio.Future() asyncio.ensure_future(take_your_time(future)) loop.run_until_complete(future) print(future.result()) loop.close() |
Это довольно громоздко. Asyncio предлагает задачи, чтобы сделать работу с фьючерсами и сопрограммами более простой. Задача — это подкласс Future, который оборачивает сопрограмму и его можно отменить.
Сопрограмма не должна принимать явный фьючерс и устанавливать его результат или исключение. Вот как выполнить те же операции с помощью задачи:
1 2 3 4 5 6 7 8 9 10 11 |
import asyncio async def take_your_time(): await asyncio.sleep(1) return 42 loop = asyncio.get_event_loop() task = loop.create_task(take_your_time()) loop.run_until_complete(task) print(task.result()) loop.close() |
Транспорты, протоколы и потоки
Транспорт — это абстракция канала взаимодействия. Транспорт всегда поддерживает определенный протокол. Asyncio предоставляет встроенные реализации для TCP, UDP, SSL и каналов подпроцесса.
Если вы знакомы с сетевым программированием на основе сокетов, вам будет легко освоить транспорты и протоколы. С Asyncio вы получаете стандартное асинхронное сетевое программирование. Давайте рассмотрим печально известный echo-сервер и клиент («hello world» сетевого программирования).
Во-первых, клиент echo реализует класс с именем EchoClient, который является производным от asyncio.Protocol. Он сохраняет свой цикл событий и сообщение, которое он отправляет на сервер при подключении.
При обратном вызове connection_made() он записывает сообщение в транспорт. В методе data_received() он просто выводит ответ сервера, а в методе connection_lost() останавливает цикл обработки событий. При передаче экземпляра класса EchoClient в метод цикла create_connection(), результатом является сопрограмма, которую цикл выполняет, пока не завершится.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
import asyncio class EchoClient(asyncio.Protocol): def __init__(self, message, loop): self.message = message self.loop = loop def connection_made(self, transport): transport.write(self.message.encode()) print('Data sent: {!r}'.format(self.message)) def data_received(self, data): print('Data received: {!r}'.format(data.decode())) def connection_lost(self, exc): print('The server closed the connection') print('Stop the event loop') self.loop.stop() loop = asyncio.get_event_loop() message = 'Hello World!' coro = loop.create_connection(lambda: EchoClient(message, loop), '127.0.0.1', 8888) loop.run_until_complete(coro) loop.run_forever() loop.close() |
Сервер работает аналогично за исключением того, что он работает вечно, ожидая подключения клиентов. После отправки Echo-ответа он также закрывает соединение с клиентом и готов к подключению следующего клиента.
Новый экземпляр EchoServer создается для каждого соединения, поэтому даже если несколько клиентов подключаются одновременно, проблемы конфликтов с атрибутом transport не возникнет.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
import asyncio class EchoServer(asyncio.Protocol): def connection_made(self, transport): peername = transport.get_extra_info('peername') print('Connection from {}'.format(peername)) self.transport = transport def data_received(self, data): message = data.decode() print('Data received: {!r}'.format(message)) print('Send: {!r}'.format(message)) self.transport.write(data) print('Close the client socket') self.transport.close() loop = asyncio.get_event_loop() # Each client connection will create a new protocol instance coro = loop.create_server(EchoServer, '127.0.0.1', 8888) server = loop.run_until_complete(coro) print('Serving on {}'.format(server.sockets[0].getsockname())) loop.run_forever() |
Вот результат после подключения двух клиентов:
1 2 3 4 5 6 7 8 9 |
Serving on ('127.0.0.1', 8888) Connection from ('127.0.0.1', 53248) Data received: 'Hello World!' Send: 'Hello World!' Close the client socket Connection from ('127.0.0.1', 53351) Data received: 'Hello World!' Send: 'Hello World!' Close the client socket |
Потоки предоставляют высокоуровневый API, который основан на сопрограммах и предоставляет абстракции Reader и Writer. Протоколы и транспорты скрыты, нам нет необходимости определять собственные классы, и нет обратных вызовов. Вы просто ожидаете такие события, как соединение и получение данных.
Клиент вызывает функцию open_connection(), которая возвращает объекты Reader и Writer. Чтобы закрыть соединение, он закрывает Reader.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
import asyncio async def tcp_echo_client(message, loop): reader, writer = await asyncio.open_connection( '127.0.0.1', 8888, loop=loop) print('Send: %r' % message) writer.write(message.encode()) data = await reader.read(100) print('Received: %r' % data.decode()) print('Close the socket') writer.close() message = 'Hello World!' loop = asyncio.get_event_loop() loop.run_until_complete(tcp_echo_client(message, loop)) loop.close() |
Сервер также значительно упрощен.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
import asyncio async def handle_echo(reader, writer): data = await reader.read(100) message = data.decode() addr = writer.get_extra_info('peername') print("Received %r from %r" % (message, addr)) print("Send: %r" % message) writer.write(data) await writer.drain() print("Close the client socket") writer.close() loop = asyncio.get_event_loop() coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop) server = loop.run_until_complete(coro) print('Serving on {}'.format(server.sockets[0].getsockname())) loop.run_forever() |
Работа с подпроцессами
Asyncio также охватывает взаимодействия с подпроцессами. Следующая программа запускает другой процесс Python и выполняет код «import this». Это одна из знаменитых пасхалок Python, она выводит «Zen of Python». Проверьте вывод ниже.
Процесс Python запускается в сопрограмме zen() с использованием функции create_subprocess_exec() и связывает стандартный вывод с пайпом. Затем он перебирает стандартные выходные строки, используя await, чтобы дать возможность выполнить другие процессы или сопрограммы, если выходные данные еще не готовы.
Обратите внимание, что в Windows вы должны установить цикл обработки событий ProactorEventLoop , потому что стандартный SelectorEventLoop не поддерживает пайпы.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
import asyncio.subprocess import sys async def zen(): code = 'import this' create = asyncio.create_subprocess_exec( sys.executable, '-c', code, stdout=asyncio.subprocess.PIPE) proc = await create data = await proc.stdout.readline() while data: line = data.decode('ascii').rstrip() print(line) data = await proc.stdout.readline() await proc.wait() if sys.platform == "win32": loop = asyncio.ProactorEventLoop() asyncio.set_event_loop(loop) else: loop = asyncio.get_event_loop() loop.run_until_complete(zen()) Output: The Zen of Python, by Tim Peters Beautiful is better than ugly. Explicit is better than implicit. Simple is better than complex. Complex is better than complicated. Flat is better than nested. Sparse is better than dense. Readability counts. Special cases aren't special enough to break the rules. Although practicality beats purity. Errors should never pass silently. Unless explicitly silenced. In the face of ambiguity, refuse the temptation to guess. There should be one-- and preferably only one --obvious way to do it. Although that way may not be obvious at first unless you're Dutch. Now is better than never. Although never is often better than *right* now. If the implementation is hard to explain, it's a bad idea. If the implementation is easy to explain, it may be a good idea. Namespaces are one honking great idea -- let's do more of those! |
Заключение
Python asyncio — это комплексная среда для асинхронного программирования. Она применима в разных областях и поддерживает как низкоуровневые, так и высокоуровневые API. Она все еще относительно молода и не достаточно хорошо изучена сообществом.
Я уверен, что со временем появятся лучшие практики и новые примеры, которые облегчат использование этой мощной библиотеки.
Автор: Gigi Sayfan
Источник: //code.tutsplus.com
Редакция: Команда webformyself.