## Asynchronous python
## Summary * Concepts * Objects & keywords
Concepts
### Blocking vs non-blocking * Synchronous code is not efficient when doing IOs * program will wait even when the system is doing 'blocking' actions (ie disk or network access) * Program is idling while waiting for the system (and so CPU is not working) * Not really efficient, cpu-wise
### Blocking example Matteo is getting a coffee. He waits in front of the machine while it's pouring the coffee. He takes the cup and goes back.
### Blocking vs non-blocking Asynchronous code has been tailored to fix lost time * Whenever the code is blocking, the program is switching to another CPU consuming task * Code is not anymore "what you see is what you get" * Only guarantee is that code will be executed later on * Task order is not known
### Non-blocking example Matteo is getting a coffee. He starts the machine with the cup prepared. He goes back to writing an email. Later on, he grabs his cup.
### Example code ```python #!/usr/bin/env python3 # rand.py import asyncio import logging import random logging.basicConfig(level=logging.INFO) async def makerandom(idx: int, threshold: int = 6) -> int: logging.getLogger(str(idx)).info("Initiated makerandom(%s)", idx) i = random.randint(0, 10) while i <= threshold: logging.getLogger(str(idx)).info( "makerandom(%s) == %s too low; retrying.", idx, i ) await asyncio.sleep(idx + 1) i = random.randint(0, 10) logging.getLogger(str(idx)).info("---> Finished: makerandom(%s) == {%s}", idx, i) return i async def main(): res = await asyncio.gather(*(makerandom(i, 10 - i - 1) for i in range(3))) return res if __name__ == "__main__": random.seed(444) r1, r2, r3 = asyncio.run(main()) logging.getLogger().info("r1: %s, r2: %s, r3: %s", r1, r2, r3) ```
The objects & keywords
## [The event loop](https://docs.python.org/3/library/asyncio-eventloop.html) - The event loop is what is orchestrating everything - Native one is replacable by other implementation (see [uvloop](https://github.com/MagicStack/uvloop) for example) - Once in the code, can be accessed using [get_running_loop](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.get_running_loop) - Ironically, you will almost never use it
## Coroutine function A `coroutine function` is a function with `async` keyword, generating a `coroutine object` ```python async def some_function(): ... return 0 ``` Note : Adding `async` keyword to an existing function makes it a `coroutine function`, but it does not make it "non-blocking"
## [Awaitables - Coroutine object, Task and Future](https://docs.python.org/3/library/asyncio-task.html#awaitables) * Awaitable : an object that can be used with `await` * Coroutine object : an awaitable, obtained from calling a coroutine function * Task : a "wrapper" to schedule the execution of a coroutine soon * Future : special low-level awaitable object, of a "future result" (most probably gotten from [run_in_executor](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor))
## [Awaitables - Coroutine object, Task and Future](https://docs.python.org/3/library/asyncio-task.html#awaitables) ```python import asyncio async def some_function(): return 42 async def main(): print("HERE") some_function() print("THERE") asyncio.run(main()) ```
## [Awaitables - Coroutine object, Task and Future](https://docs.python.org/3/library/asyncio-task.html#awaitables) ```python import asyncio async def some_function(): print("INSIDE") return 42 async def main(): print("HERE") res = await some_function() print("THERE") print(res) asyncio.run(main()) ```
## [Awaitables - Coroutine object, Task and Future](https://docs.python.org/3/library/asyncio-task.html#awaitables) ```python import asyncio async def some_function(): print("INSIDE") return 42 async def main(): print("HERE") t = asyncio.create_task(some_function()) await asyncio.sleep(0) print("THERE") await t asyncio.run(main()) ```
## [Awaitables - Coroutine object, Task and Future](https://docs.python.org/3/library/asyncio-task.html#awaitables) Note : The previous behaviour can change based on the loop used, and on the settings used on the event loop. For example, using the [eager task factory](https://docs.python.org/3/library/asyncio-task.html#asyncio.eager_task_factory) can change the way the task behave.
Practically
### How do I make my code async AND non blocking ? - Setup an event loop - Tag the function as async - Await the coroutine object - Identify the blocking IO part of your code - Switch the problematic code to async library if possible
### But my code is doing computation/using a lib I cannot switch ! Possibly, [run_in_executor](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor) could be a solution. It's actually what some lib are using to make file IO async (see [aiofiles](https://github.com/Tinche/aiofiles/blob/main/src/aiofiles/threadpool/utils.py#L43) for example).
### How do i run several tasks concurrently ? Possibly, [run_in_executor](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor) could be a solution. It's actually what some lib are using to make file IO async (see [aiofiles](https://github.com/Tinche/aiofiles/blob/main/src/aiofiles/threadpool/utils.py#L43) for example).
Quizz
## How can I run several coroutines concurrently ? Let's say I want to scrap a web site, or a list of url. How would I do it ?
## How can I try to get a first result of a list of coroutines ? Let's say I'm doing DNS resolving, on a list of server. I want to get the ip from an hostname, and I just want to get the first result. How would I do it ?
## How can I store values that can be accessed only by a task or its sub tasks? For example, I'm building an API, I want some variable (ip of the user, ...) to be accessible. How would I do it ?
## What has gone horribly wrong with the following code ? ```python import asyncio from contextvars import ContextVar from uuid import uuid4 import logging LOGGER = logging.getLogger() class DbConnection: def __init__(self): self.id = uuid4() async def commit(self): await asyncio.sleep(1) async def rollback(self): await asyncio.sleep(1) async def start(self): await asyncio.sleep(1) return self async def close(self): await asyncio.sleep(1) class LazyDbConnection: def __init__(self): self.id = uuid4() self._db_connection = None async def __aenter__(self): # We defer the connection start at first get_db return self async def get_db(self): if self._db_connection is None: # Initialise connection if needed self._db_connection = await DbConnection().start() return self._db_connection async def __aexit__(self, exc_type, exc, tb): if self._db_connection is None: return try: if exc is None: await self._db_connection.commit() await self._db_connection.rollback() finally: await self._db_connection.close() # https://docs.python.org/3/library/contextvars.html#asyncio-support db_singleton: ContextVar[LazyDbConnection] = ContextVar("db", default=None) async def do_something(): db = await db_singleton.get().get_db() # Do something with db return 42 async def do_something_else(): db = await db_singleton.get().get_db() # Do something with db return 27 async def handle_request(reader, writer): token = db_singleton.set(LazyDbConnection()) async with db_singleton.get(): response = sum(await asyncio.gather(do_something(), do_something_else())) db_singleton.reset(token) writer.write(b"HTTP/1.1 200 OK\r\n") # status line writer.write(b"\r\n") # headers writer.write(str(response).encode()) # body writer.close() async def main(): srv = await asyncio.start_server(handle_request, "127.0.0.1", 8081) async with srv: await srv.serve_forever() asyncio.run(main()) # To test it you can use telnet or curl: # telnet 127.0.0.1 8081 # curl 127.0.0.1:8081 ```
Let's build something
Let's build a small API ! - Download data from https://developer.imdb.com/non-commercial-datasets/ - Bootstrap a small FastAPI server, served by unicorn (with uvloop event loop) - Create some routes about reading movies Steps - Use pandas to read the .tsv.gz directly (no need to unzip) and expose it on a route - Add a /health route returning OK - Try to call the first & second route concurrently (time the /health route) (you can have a look at insomnia or postman to do queries) - Make the pandas route non-blocking - Try to call again and time again - Setup a HTTP middleware, storing user info & adding a unique id per request in a accessible way from any part of the codes - Setup sqlalchemy with sqlite driver (`aiosqlite`) - Initialise the db with all the data (try to write an async/non blocking (as much as possible) script, so that copying data from the tsv to the SQL lite db is not too long) - Replace TSV reading by SQL queries