FastAPIでPython Async IOをマスターする
Emily Parker
Product Engineer · Leapcell

Pythonはインタプリタ言語であるため、Python + Djangoの組み合わせなど、バックエンド開発で使用する場合、Java + Springと比較して、応答時間がわずかに長くなります。ただし、コードが合理的である限り、その差はそれほど大きくありません。Djangoがマルチプロセスモードを使用している場合でも、その同時処理能力は依然としてはるかに弱いです。Pythonには、同時処理能力を向上させるためのいくつかのソリューションがあります。たとえば、非同期フレームワークFastAPIを、その非同期機能とともに使用すると、I/O集中型タスクの同時処理能力を大幅に向上させることができます。FastAPIは、最速のPythonフレームワークの1つです。
FastAPIを例として
まず、FastAPIの使用方法を簡単に見てみましょう。
例1:デフォルトのネットワーク非同期IO
インストール:
pip install fastapi
簡単なサーバーサイドコード:
# main.py
from typing import Union
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def read_root():
return {"Hello": "World"}
起動:
uvicorn main:app --reload
他のフレームワークと比較して、FastAPIのインターフェースにはasync
キーワードが追加されているだけであることがわかります。async
キーワードは、インターフェースを非同期として定義します。戻り値だけからは、FastAPIと他のPythonフレームワークの違いを区別することはできません。違いは同時アクセスにあります。FastAPIのサーバーのスレッドがhttp://127.0.0.1:8000/
などのルートリクエストを処理するときに、ネットワークI/Oが発生した場合、それを待たずに他のリクエストを処理します。ネットワークI/Oが完了すると、実行が再開されます。この非同期機能により、I/O集中型タスクの処理能力が向上します。
例2:明示的なネットワーク非同期IO
別の例を見てみましょう。ビジネスコードでは、明示的な非同期ネットワークリクエストが開始されます。このネットワークI/Oの場合、ルートリクエストと同様に、FastAPIも非同期で処理します。
# app.py
from fastapi import FastAPI, HTTPException
import httpx
app = FastAPI()
# 非同期GETリクエストの例
@app.get("/external-api")
async def call_external_api():
url = "https://leapcell.io"
async with httpx.AsyncClient() as client:
response = await client.get(url)
if response.status_code!= 200:
raise HTTPException(status_code=response.status_code, detail="データの取得に失敗しました")
return response.json()
データベースI/Oを非同期にする場合は、データベースドライバーまたはORMからの非同期操作のサポートが必要です。
非同期IO
FastAPIの非同期処理の中核となる実装は非同期I/O
です。FastAPIを使用せずに、非同期I/Oを使用して非同期処理機能を備えたサーバーを直接起動できます。
import asyncio from aiohttp import web async def index(request): await asyncio.sleep(1) # I/O操作をシミュレート return web.Response(text='{"Hello": "World"}', content_type='application/json') async def init(loop): # イベントループを使用してWebリクエストを監視 app = web.Application(loop=loop) app.router.add_route('GET', '/', index) # サーバーを起動し、イベントループはWebリクエストを監視および処理します srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000) print('Server started at http://127.0.0.1:8000...') return srv # イベントループを明示的に取得 loop = asyncio.get_event_loop() # イベントループを開始 loop.run_until_complete(init(loop)) loop.run_forever()
この例を起動すると、http://127.0.0.1:8000/
の戻り値は例1と同じになります。非同期I/Oの基盤となる実装原理は、「コルーチン」と「イベントループ」です。
コルーチン
async def index(request): await asyncio.sleep(1) # I/O操作をシミュレート return web.Response(text='{"Hello": "World"}', content_type='application/json')
関数index
はasync def
で定義されており、これはコルーチンであることを意味します。await
キーワードは、I/O操作の前に使用され、実行スレッドにこのI/O操作を待機しないように指示します。通常の関数の呼び出しはスタックを介して実装され、関数は1つずつ呼び出して実行することしかできません。ただし、コルーチンは特別な種類の関数(協調スレッドではありません)です。これにより、スレッドはawait
マークで実行を一時停止し、他のタスクを実行するように切り替えることができます。I/O操作が完了すると、実行が継続されます。
複数のコルーチンが同時に実行される効果を見てみましょう。
import asyncio from datetime import datetime async def coroutine3(): print(f"Coroutine 3 started at {datetime.now()}") await asyncio.sleep(1) # I/O操作をシミュレート print(f"Coroutine 3 finished at {datetime.now()}") async def coroutine2(): print(f"Coroutine 2 started at {datetime.now()}") await asyncio.sleep(1) # I/O操作をシミュレート print(f"Coroutine 2 finished at {datetime.now()}") async def coroutine1(): print(f"Coroutine 1 started at {datetime.now()}") await asyncio.sleep(1) # I/O操作をシミュレート print(f"Coroutine 1 finished at {datetime.now()}") async def main(): print("Main started") # コルーチンを同時に実行するためのタスクを作成 task1 = asyncio.create_task(coroutine1()) task2 = asyncio.create_task(coroutine2()) task3 = asyncio.create_task(coroutine3()) # すべてのタスクが完了するのを待つ await task1 await task2 await task3 print("Main finished") # メインコルーチンを実行 asyncio.run(main())
出力:
Main started
Coroutine 1 started at 2024-12-27 12:28:01.661251
Coroutine 2 started at 2024-12-27 12:28:01.661276
Coroutine 3 started at 2024-12-27 12:28:01.665012
Coroutine 1 finished at 2024-12-27 12:28:02.665125
Coroutine 2 finished at 2024-12-27 12:28:02.665120
Coroutine 3 finished at 2024-12-27 12:28:02.665120
Main finished
スレッドは3つのタスクを1つずつ実行しないことがわかります。I/O操作が発生すると、他のタスクを実行するように切り替わります。I/O操作が完了すると、実行が継続されます。また、3つのコルーチンが基本的に同時にI/O操作を待機し始めるため、最終的な実行完了時間は基本的に同じであることがわかります。イベントループはここでは明示的に使用されていませんが、asyncio.run
は暗黙的に使用します。
ジェネレーター
コルーチンはジェネレーターを介して実装されます。ジェネレーターは関数の実行を一時停止し、再開することもできます。これらはコルーチンの特性です。
def simple_generator(): print("First value") yield 1 print("Second value") yield 2 print("Third value") yield 3 # simple_generatorはジェネレーター関数、genはジェネレーター gen = simple_generator() print(next(gen)) # 出力: First value \n 1 print(next(gen)) # 出力: Second value \n 2 print(next(gen)) # 出力: Third value \n 3
next()
でジェネレーターを実行すると、yield
が発生すると一時停止します。next()
が再度実行されると、最後に一時停止したyield
から実行が継続されます。Python 3.5より前は、コルーチンも「型アノテーション」+ yeild
で記述されていました。Python 3.5以降では、async def
+ await
が使用されます。
import asyncio from datetime import datetime @asyncio.coroutine def my_coroutine(): print("Start coroutine", datetime.now()) # asyncio.sleep(1)への非同期呼び出し: yield from asyncio.sleep(1) print("End coroutine", datetime.now()) # EventLoopを取得 loop = asyncio.get_event_loop() # コルーチンを実行 loop.run_until_complete(my_coroutine()) loop.close()
ジェネレーターの一時停止および再開機能は、コルーチン以外にも多くのことに使用できます。たとえば、ループ中に計算したり、アルゴリズムを保存したりできます。たとえば、パスカルの三角形(各行の両端は1で、他の位置の数値は上の2つの数値の合計)を実装します。
def pascal_triangle():
row = [1]
while True:
yield row
new_row = [1] # 各行の最初の要素は常に1
for i in range(1, len(row)):
new_row.append(row[i - 1] + row[i])
new_row.append(1) # 各行の最後の要素は常に1
row = new_row
# パスカルの三角形の最初の5行を生成して印刷
triangle = pascal_triangle()
for _ in range(5):
print(next(triangle))
出力:
[1]
[1, 1]
[1, 2, 1]
[1, 3, 3, 1]
[1, 4, 6, 4, 1]
イベントループ
コルーチンの実行は一時停止できるため、コルーチンはいつ実行を再開しますか?これには、イベントループを使用して実行スレッドに指示する必要があります。
# EventLoopを取得 loop = asyncio.get_event_loop() # イベントループはコルーチンを実行 loop.run_until_complete(my_coroutine()) loop.close()
イベントループはI/O多重化技術を使用し、コルーチンが実行を継続できるイベントを絶えず監視するために循環します。実行可能になると、スレッドはコルーチンの実行を継続します。
I/O多重化技術
I/O多重化を簡単な方法で理解するには、私は宅配ステーションのボスです。各宅配業者にタスクの完了について積極的に尋ねる必要はありません。代わりに、宅配業者はタスクを完了すると、自分から私のところに来ます。これにより、タスク処理能力が向上し、より多くのことを実行できます。
select
、poll
、およびepoll
はすべてI/O多重化を実現できます。select
およびpoll
と比較して、epoll
の方がパフォーマンスが優れています。Linuxは通常、デフォルトでepoll
を使用し、macOSはepoll
に似たkqueue
を使用し、同様のパフォーマンスを発揮します。
イベントループを使用したソケットサーバー
import selectors
import socket
# selectorsオブジェクトを作成します。これはepollの実装と同等であり、Linuxで実行する場合
sel = selectors.DefaultSelector()
# リクエスト受信イベント処理関数。新しい接続を受け入れ、読み取りイベントを登録します
def accept(sock, mask):
conn, addr = sock.accept() # 接続を受け入れる
print('Accepted connection from', addr)
conn.setblocking(False)
sel.register(conn, selectors.EVENT_READ, read) # 読み取りイベントを登録
# リクエスト読み取りイベント処理関数。リクエストデータを読み取り、HTTP応答を送信して、接続を閉じます。
def read(conn, mask):
data = conn.recv(100) # 接続からデータを読み取る
print('response to')
response = "HTTP/1.1 200 OK\r\n" \
"Content-Type: application/json\r\n" \
"Content-Length: 18\r\n" \
"Connection: close\r\n" \
"\r\n" \
"{\"Hello\": \"World\"}"
conn.send(response.encode()) # データをエコー
print('Closing connection')
sel.unregister(conn) # イベントを登録解除
conn.close() # 接続を閉じる
# サーバーソケットを作成
sock = socket.socket()
sock.bind(('localhost', 8000))
sock.listen()
sock.setblocking(False)
# acceptイベントを登録
sel.register(sock, selectors.EVENT_READ, accept)
print("Server is running on port 8000...")
# イベントループ
while True:
# これはリクエストがない場合はブロックされます
events = sel.select() # 準備ができているファイル記述子(イベント)を選択します
print("events length: ", len(events))
for key, mask in events:
callback = key.data # イベント処理関数を取得
print("handler_name:", callback.__name__)
callback(key.fileobj, mask) # イベント処理関数を呼び出す
サーバーソケットを起動して、指定されたポートを監視します。Linuxシステムで実行している場合、selectors
はデフォルトでepoll
を実装として使用します。このコードはepoll
を使用して、リクエスト受信イベント(acceptイベント)を登録します。新しいリクエストが到着すると、epoll
がトリガーされてイベント処理関数が実行され、同時に、リクエストデータを処理して応答するための読み取りイベント(readイベント)が登録されます。Web側からhttp://127.0.0.1:8000/
でアクセスすると、戻り値は例1と同じになります。サーバー実行ログ:
Server is running on port 8000...
events length: 1
handler_name: accept
Accepted connection from ('127.0.0.1', 60941)
events length: 1
handler_name: read
response to
Closing connection
ソケットサーバー
ソケットを直接使用してサーバーを起動します。ブラウザでhttp://127.0.0.1:8080/
でアクセスするか、curl http://127.0.0.1:8080/
を使用すると、{"Hello": "World"}
が返されます
import socket from datetime import datetime # TCPソケットを作成 server_socket = socket.socket() # ソケットを指定されたIPアドレスとポート番号にバインド server_socket.bind(('127.0.0.1', 8001)) # 着信接続のリスニングを開始 server_socket.listen(5) # クライアント接続を受け入れるループ while True: print("%s Waiting for a connection..." % datetime.now()) client_socket, addr = server_socket.accept() # これはブロックされ、クライアント接続を待機します print(f"{datetime.now()} Got connection from {addr}") # クライアントデータを受信 data = client_socket.recv(1024) print(f"Received: {data.decode()}") # 応答データを送信 response = "HTTP/1.1 200 OK\r\n" \ "Content-Type: application/json\r\n" \ "Content-Length: 18\r\n" \ "Connection: close\r\n" \ "\r\n" \ "{\"Hello\": \"World\"}" client_socket.sendall(response.encode()) # クライアントソケットを閉じる client_socket.close()
curl http://127.0.0.1:8001/
でアクセスすると、サーバー実行ログ:
2024-12-27 12:53:36.711732 Waiting for a connection...
2024-12-27 12:54:30.715928 Got connection from ('127.0.0.1', 64361)
Received: GET / HTTP/1.1
Host: 127.0.0.1:8001
User-Agent: curl/8.4.0
Accept: */*
まとめ
非同期I/Oは、「コルーチン」と「イベントループ」を使用して最下層で実装されます。「コルーチン」は、スレッドが実行中にマークされたI/O操作に遭遇したときに、I/Oが完了するのを待つ必要がなく、一時停止してスレッドがブロックせずに他のタスクを実行できるようにします。「イベントループ」は、I/O多重化技術を使用し、I/Oイベントを監視するために常にサイクルします。特定のI/Oイベントが完了すると、対応するコールバックがトリガーされ、コルーチンが実行を継続できるようになります。
Leapcell: FastAPIおよびその他のPythonアプリケーションに最適なプラットフォーム:
最後に、Flask/FastAPIのデプロイに最適なプラットフォームLeapcellを紹介します。
Leapcellは、最新の分散アプリケーション向けに特別に設計されたクラウドコンピューティングプラットフォームです。その従量課金制の価格モデルにより、アイドルコストが発生しないことが保証されます。つまり、ユーザーは実際に使用するリソースに対してのみ支払います。
WSGI/ASGIアプリケーションに対するLeapcellの独自の利点:
1.多言語サポート
- JavaScript、Python、Go、またはRustでの開発をサポート。
無制限のプロジェクトの無料デプロイ
- 使用量に基づいてのみ課金。リクエストがない場合は課金されません。
2.比類のない費用対効果
- 従量課金制で、アイドル料金はかかりません。
- たとえば、25ドルは694万件のリクエストをサポートでき、平均応答時間は60ミリ秒です。
3.簡素化された開発者エクスペリエンス
- 簡単なセットアップのための直感的なユーザーインターフェース。
- 完全に自動化されたCI/CDパイプラインとGitOps統合。
- リアルタイムのメトリックとログにより、実用的な洞察を提供。
4.簡単なスケーラビリティと高パフォーマンス
- 高い同時実行を簡単に処理するための自動スケーリング。
- 運用オーバーヘッドがゼロであるため、開発者は開発に集中できます。
詳細については、ドキュメントをご覧ください!
Leapcell Twitter:https://x.com/LeapcellHQ