## Summary
* Concepts
* Objects & keywords
Concepts
### Blocking vs non-blocking
* Synchronous code is not efficient when doing IOs
* program will wait even when the system is doing 'blocking' actions (ie disk or network access)
* Program is idling while waiting for the system (and so CPU is not working)
* Not really efficient, cpu-wise
### Blocking example
Matteo is getting a coffee.
He waits in front of the machine while it's pouring the coffee.
He takes the cup and goes back.
### Blocking vs non-blocking
Asynchronous code has been tailored to fix lost time
* Whenever the code is blocking, the program is switching to another CPU consuming task
* Code is not anymore "what you see is what you get"
* Only guarantee is that code will be executed later on
* Task order is not known
### Non-blocking example
Matteo is getting a coffee.
He starts the machine with the cup prepared.
He goes back to writing an email.
Later on, he grabs his cup.
### Example code
```python
#!/usr/bin/env python3
# rand.py
import asyncio
import logging
import random
logging.basicConfig(level=logging.INFO)
async def makerandom(idx: int, threshold: int = 6) -> int:
logging.getLogger(str(idx)).info("Initiated makerandom(%s)", idx)
i = random.randint(0, 10)
while i <= threshold:
logging.getLogger(str(idx)).info(
"makerandom(%s) == %s too low; retrying.", idx, i
)
await asyncio.sleep(idx + 1)
i = random.randint(0, 10)
logging.getLogger(str(idx)).info("---> Finished: makerandom(%s) == {%s}", idx, i)
return i
async def main():
res = await asyncio.gather(*(makerandom(i, 10 - i - 1) for i in range(3)))
return res
if __name__ == "__main__":
random.seed(444)
r1, r2, r3 = asyncio.run(main())
logging.getLogger().info("r1: %s, r2: %s, r3: %s", r1, r2, r3)
```
The objects & keywords
## [The event loop](https://docs.python.org/3/library/asyncio-eventloop.html)
- The event loop is what is orchestrating everything
- Native one is replacable by other implementation (see [uvloop](https://github.com/MagicStack/uvloop) for example)
- Once in the code, can be accessed using [get_running_loop](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.get_running_loop)
- Ironically, you will almost never use it
## Coroutine function
A `coroutine function` is a function with `async` keyword, generating a `coroutine object`
```python
async def some_function():
...
return 0
```
Note : Adding `async` keyword to an existing function makes it a `coroutine function`, but it does not make it "non-blocking"
## [Awaitables - Coroutine object, Task and Future](https://docs.python.org/3/library/asyncio-task.html#awaitables)
* Awaitable : an object that can be used with `await`
* Coroutine object : an awaitable, obtained from calling a coroutine function
* Task : a "wrapper" to schedule the execution of a coroutine soon
* Future : special low-level awaitable object, of a "future result" (most probably gotten from [run_in_executor](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor))
## [Awaitables - Coroutine object, Task and Future](https://docs.python.org/3/library/asyncio-task.html#awaitables)
```python
import asyncio
async def some_function():
return 42
async def main():
print("HERE")
some_function()
print("THERE")
asyncio.run(main())
```
## [Awaitables - Coroutine object, Task and Future](https://docs.python.org/3/library/asyncio-task.html#awaitables)
```python
import asyncio
async def some_function():
print("INSIDE")
return 42
async def main():
print("HERE")
res = await some_function()
print("THERE")
print(res)
asyncio.run(main())
```
## [Awaitables - Coroutine object, Task and Future](https://docs.python.org/3/library/asyncio-task.html#awaitables)
```python
import asyncio
async def some_function():
print("INSIDE")
return 42
async def main():
print("HERE")
t = asyncio.create_task(some_function())
await asyncio.sleep(0)
print("THERE")
await t
asyncio.run(main())
```
## [Awaitables - Coroutine object, Task and Future](https://docs.python.org/3/library/asyncio-task.html#awaitables)
Note : The previous behaviour can change based on the loop used, and on the settings used on the event loop.
For example, using the [eager task factory](https://docs.python.org/3/library/asyncio-task.html#asyncio.eager_task_factory) can change the way the task behave.
Practically
### How do I make my code async AND non blocking ?
- Setup an event loop
- Tag the function as async
- Await the coroutine object
- Identify the blocking IO part of your code
- Switch the problematic code to async library if possible
### But my code is doing computation/using a lib I cannot switch !
Possibly, [run_in_executor](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor)
could be a solution.
It's actually what some lib are using to make file IO async (see [aiofiles](https://github.com/Tinche/aiofiles/blob/main/src/aiofiles/threadpool/utils.py#L43) for example).
### How do i run several tasks concurrently ?
Possibly, [run_in_executor](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor)
could be a solution.
It's actually what some lib are using to make file IO async (see [aiofiles](https://github.com/Tinche/aiofiles/blob/main/src/aiofiles/threadpool/utils.py#L43) for example).
Quizz
## How can I run several coroutines concurrently ?
Let's say I want to scrap a web site, or a list of url.
How would I do it ?
## How can I try to get a first result of a list of coroutines ?
Let's say I'm doing DNS resolving, on a list of server.
I want to get the ip from an hostname, and I just want to get the first result.
How would I do it ?
## How can I store values that can be accessed only by a task or its sub tasks?
For example, I'm building an API, I want some variable (ip of the user, ...) to be accessible.
How would I do it ?
## What has gone horribly wrong with the following code ?
```python
import asyncio
from contextvars import ContextVar
from uuid import uuid4
import logging
LOGGER = logging.getLogger()
class DbConnection:
def __init__(self):
self.id = uuid4()
async def commit(self):
await asyncio.sleep(1)
async def rollback(self):
await asyncio.sleep(1)
async def start(self):
await asyncio.sleep(1)
return self
async def close(self):
await asyncio.sleep(1)
class LazyDbConnection:
def __init__(self):
self.id = uuid4()
self._db_connection = None
async def __aenter__(self):
# We defer the connection start at first get_db
return self
async def get_db(self):
if self._db_connection is None:
# Initialise connection if needed
self._db_connection = await DbConnection().start()
return self._db_connection
async def __aexit__(self, exc_type, exc, tb):
if self._db_connection is None:
return
try:
if exc is None:
await self._db_connection.commit()
await self._db_connection.rollback()
finally:
await self._db_connection.close()
# https://docs.python.org/3/library/contextvars.html#asyncio-support
db_singleton: ContextVar[LazyDbConnection] = ContextVar("db", default=None)
async def do_something():
db = await db_singleton.get().get_db()
# Do something with db
return 42
async def do_something_else():
db = await db_singleton.get().get_db()
# Do something with db
return 27
async def handle_request(reader, writer):
token = db_singleton.set(LazyDbConnection())
async with db_singleton.get():
response = sum(await asyncio.gather(do_something(), do_something_else()))
db_singleton.reset(token)
writer.write(b"HTTP/1.1 200 OK\r\n") # status line
writer.write(b"\r\n") # headers
writer.write(str(response).encode()) # body
writer.close()
async def main():
srv = await asyncio.start_server(handle_request, "127.0.0.1", 8081)
async with srv:
await srv.serve_forever()
asyncio.run(main())
# To test it you can use telnet or curl:
# telnet 127.0.0.1 8081
# curl 127.0.0.1:8081
```
Let's build something
Let's build a small API !
- Download data from https://developer.imdb.com/non-commercial-datasets/
- Bootstrap a small FastAPI server, served by unicorn (with uvloop event loop)
- Create some routes about reading movies
Steps
- Use pandas to read the .tsv.gz directly (no need to unzip) and expose it on a route
- Add a /health route returning OK
- Try to call the first & second route concurrently (time the /health route) (you can have a look at insomnia or postman to do queries)
- Make the pandas route non-blocking
- Try to call again and time again
- Setup a HTTP middleware, storing user info & adding a unique id per request in a accessible way from any part of the codes
- Setup sqlalchemy with sqlite driver (`aiosqlite`)
- Initialise the db with all the data (try to write an async/non blocking (as much as possible) script, so that copying data from the tsv to the SQL lite db is not too long)
- Replace TSV reading by SQL queries