High-Performance Python: Asyncio
Takashi Yamamoto
Infrastructure Engineer · Leapcell

Concurrency プログラミングとは、複数のタスクを同時に実行するプログラミング手法です。Pythonでは、asyncio
は非同期プログラミングを実装するための強力なツールです。コルーチンの概念に基づいて、asyncio
はI/O集中型のタスクを効率的に処理できます。この記事では、asyncio
の基本的な原理と使用法を紹介します。
なぜasyncioが必要なのか
I/O操作を処理する際に、通常のシングルスレッドと比較してマルチスレッドを使用すると効率が大幅に向上することはわかっています。では、なぜasyncio
がまだ必要なのですか?
マルチスレッドには多くの利点があり、広く使用されていますが、特定の制限もあります。
- たとえば、マルチスレッドの実行プロセスは中断されやすく、競合状態が発生する可能性があります。
- さらに、スレッドの切り替え自体にも一定のコストがかかり、スレッドの数を無制限に増やすことはできません。したがって、I/O操作が非常に重い場合、マルチスレッドは高い効率と高品質の要件を満たすことができません。
asyncio
が登場したのは、まさにこれらの問題を解決するためです。
Sync VS Async
まず、Sync(同期)とAsync(非同期)の概念を区別しましょう。
- Syncとは、操作が次々と実行されることを意味します。次の操作は、前の操作が完了した後にのみ実行できます。
- Asyncとは、異なる操作を交互に実行できることを意味します。操作の1つがブロックされた場合、プログラムは待機せずに、実行可能な操作を見つけて続行します。
asyncioの仕組み
- コルーチン:
asyncio
はコルーチンを使用して非同期操作を実現します。コルーチンは、async
キーワードで定義された特別な関数です。コルーチンでは、await
キーワードを使用して、現在のコルーチンの実行を一時停止し、非同期操作が完了するのを待つことができます。 - イベントループ:イベントループは、
asyncio
の中核的なメカニズムの1つです。コルーチンのスケジュールと実行、およびコルーチン間の切り替えの処理を担当します。イベントループは、実行可能なタスクを常にポーリングします。タスクの準備が整うと(I/O操作の完了やタイマーの期限切れなど)、イベントループはそれを実行キューに入れ、次のタスクに進みます。 - 非同期タスク:
asyncio
では、非同期タスクを作成してコルーチンを実行します。非同期タスクは、asyncio.create_task()
関数によって作成されます。この関数は、コルーチンをawaitableなオブジェクトにカプセル化し、処理のためにイベントループに送信します。 - 非同期I/O操作:
asyncio
は、一連の非同期I/O操作(ネットワークリクエスト、ファイルの読み書きなど)を提供します。これらは、await
キーワードを介してコルーチンおよびイベントループとシームレスに統合できます。非同期I/O操作を使用することにより、I/O完了の待機中のブロッキングを回避し、プログラムのパフォーマンスと同時実行性を向上させることができます。 - コールバック:
asyncio
は、コールバック関数を使用して非同期操作の結果を処理することもサポートしています。asyncio.ensure_future()
関数を使用して、コールバック関数をawaitableなオブジェクトにカプセル化し、処理のためにイベントループに送信できます。 - 並行実行:
asyncio
は、複数のコルーチンタスクを並行して実行できます。イベントループは、タスクの準備状況に応じてコルーチンの実行を自動的にスケジュールし、効率的な並行プログラミングを実現します。
要するに、asyncio
の動作原理は、コルーチンとイベントループのメカニズムに基づいています。非同期操作にコルーチンを使用し、イベントループにコルーチンのスケジュールと実行を担当させることにより、asyncio
は効率的な非同期プログラミングモデルを実現します。
コルーチンと非同期プログラミング
コルーチンは、asyncio
における重要な概念です。これらは、スレッド切り替えのオーバーヘッドなしに、タスク間をすばやく切り替えることができる軽量な実行ユニットです。コルーチンはasync
キーワードで定義でき、await
キーワードを使用してコルーチンの実行を一時停止し、特定の操作が完了した後に再開します。
コルーチンを使用して非同期プログラミングを行う方法を示す簡単なサンプルコードを次に示します。
import asyncio async def hello(): print("Hello") await asyncio.sleep(1) # 時間のかかる操作をシミュレート print("World") # イベントループを作成 loop = asyncio.get_event_loop() # コルーチンをイベントループに追加して実行 loop.run_until_complete(hello())
この例では、関数hello()
はasync
キーワードで定義されたコルーチンです。コルーチン内では、await
を使用してその実行を一時停止できます。ここでは、asyncio.sleep(1)
を使用して時間のかかる操作をシミュレートしています。run_until_complete()
メソッドは、コルーチンをイベントループに追加して実行します。
非同期I/O操作
asyncio
は主に、ネットワークリクエスト、ファイルの読み書きなど、I/O集中型のタスクを処理するために使用されます。非同期I/O操作のための一連のAPIを提供し、これらをawait
キーワードと組み合わせて使用すると、非同期プログラミングを簡単に実現できます。
asyncio
を使用して非同期ネットワークリクエストを行う方法を示す簡単なサンプルコードを次に示します。
import asyncio import aiohttp async def fetch(session, url): async with session.get(url) as response: return await response.text() async def main(): async with aiohttp.ClientSession() as session: html = await fetch(session, 'https://www.example.com') print(html) # イベントループを作成 loop = asyncio.get_event_loop() # コルーチンをイベントループに追加して実行 loop.run_until_complete(main())
この例では、ネットワークリクエストにaiohttp
ライブラリを使用しています。関数fetch()
はコルーチンです。session.get()
メソッドを介して非同期GETリクエストを開始し、await
キーワードを使用して応答が返されるのを待ちます。関数main()
は別のコルーチンです。内部で再利用するためにClientSession
オブジェクトを作成し、次にfetch()
メソッドを呼び出してWebページのコンテンツを取得して出力します。
注意:ここでは、requests
ライブラリの代わりにaiohttp
を使用しています。これは、requests
ライブラリがasyncio
と互換性がないのに対し、aiohttp
ライブラリは互換性があるためです。asyncio
を十分に活用するには、特にその強力な機能を発揮するには、多くの場合、対応するPythonライブラリが必要です。
複数のタスクの並行実行
asyncio
は、asyncio.gather()
やasyncio.wait()
など、複数のタスクを並行して実行するためのいくつかのメカニズムも提供します。これらのメカニズムを使用して複数のコルーチンタスクを並行して実行する方法を示すサンプルコードを次に示します。
import asyncio async def task1(): print("Task 1 started") await asyncio.sleep(1) print("Task 1 finished") async def task2(): print("Task 2 started") await asyncio.sleep(2) print("Task 2 finished") async def main(): await asyncio.gather(task1(), task2()) # イベントループを作成 loop = asyncio.get_event_loop() # コルーチンをイベントループに追加して実行 loop.run_until_complete(main())
この例では、2つのコルーチンタスクtask1()
とtask2()
を定義します。どちらも時間がかかる操作を実行します。コルーチンmain()
は、asyncio.gather()
を介してこれらの2つのタスクを同時に開始し、それらが完了するのを待ちます。並行実行により、プログラムの実行効率を向上させることができます。
どのように選択するか?
実際のプロジェクトでは、マルチスレッドとasyncio
のどちらを選択する必要がありますか?ある大物がそれを鮮やかに要約しました。
if io_bound: if io_slow: print('Use Asyncio') else: print('Use multi-threading') elif cpu_bound: print('Use multi-processing')
- I/Oバウンドであり、I/O操作が遅く、多くのタスク/スレッドの連携が必要な場合は、
asyncio
を使用する方が適切です。 - I/Oバウンドであるが、I/O操作が高速で、必要なタスク/スレッドの数が限られている場合は、マルチスレッドで十分です。
- CPUバウンドの場合は、プログラムの実行効率を向上させるためにマルチプロセッシングが必要です。
練習
リストを入力します。リスト内の各要素について、0からその要素までのすべての整数の2乗の合計を計算します。
同期実装
import time def cpu_bound(number): return sum(i * i for i in range(number)) def calculate_sums(numbers): for number in numbers: cpu_bound(number) def main(): start_time = time.perf_counter() numbers = [10000000 + x for x in range(20)] calculate_sums(numbers) end_time = time.perf_counter() print('Calculation takes {} seconds'.format(end_time - start_time)) if __name__ == '__main__': main()
実行時間はCalculation takes 16.00943413000002 seconds
です
concurrent.futuresを使用した非同期実装
import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed def cpu_bound(number): return sum(i * i for i in range(number)) def calculate_sums(numbers): with ProcessPoolExecutor() as executor: results = executor.map(cpu_bound, numbers) results = [result for result in results] print(results) def main(): start_time = time.perf_counter() numbers = [10000000 + x for x in range(20)] calculate_sums(numbers) end_time = time.perf_counter() print('Calculation takes {} seconds'.format(end_time - start_time)) if __name__ == '__main__': main()
実行時間はCalculation takes 7.314132894999999 seconds
です
この改善されたコードでは、concurrent.futures.ProcessPoolExecutor
を使用してプロセスプールを作成し、次にexecutor.map()
メソッドを使用してタスクを送信して結果を取得します。executor.map()
を使用した後は、結果を取得する必要がある場合、結果をリストに反復するか、他の方法を使用して結果を処理できることに注意してください。
マルチプロセッシング実装
import time import multiprocessing def cpu_bound(number): return sum(i * i for i in range(number)) def calculate_sums(numbers): with multiprocessing.Pool() as pool: pool.map(cpu_bound, numbers) def main(): start_time = time.perf_counter() numbers = [10000000 + x for x in range(20)] calculate_sums(numbers) end_time = time.perf_counter() print('Calculation takes {} seconds'.format(end_time - start_time)) if __name__ == '__main__': main()
実行時間はCalculation takes 5.024221667 seconds
です
concurrent.futures.ProcessPoolExecutor
とmultiprocessing
はどちらも、Pythonでマルチプロセス並行性を実装するためのライブラリです。いくつかの違いがあります。
- インターフェースベースのカプセル化:
concurrent.futures.ProcessPoolExecutor
は、concurrent.futures
モジュールによって提供される高レベルのインターフェースです。基盤となるマルチプロセス関数をカプセル化し、マルチプロセスのコードを簡単に記述できるようにします。一方、multiprocessing
はPythonの標準ライブラリの1つであり、完全なマルチプロセスサポートを提供し、プロセスを直接操作できます。 - APIの使用法:
concurrent.futures.ProcessPoolExecutor
の使用法は、スレッドプールの使用法と似ています。実行のために呼び出し可能なオブジェクト(関数など)をプロセスプールに送信し、実行結果を取得するために使用できるFuture
オブジェクトを返します。multiprocessing
は、より低レベルのプロセス管理および通信インターフェースを提供します。プロセスを明示的に作成、開始、および制御でき、キューまたはパイプを使用して複数のプロセス間の通信を行うことができます。 - スケーラビリティと柔軟性:
multiprocessing
は、より低レベルのインターフェースを提供するため、concurrent.futures.ProcessPoolExecutor
と比較してより柔軟性があります。プロセスを直接操作することにより、プロセス優先度の設定やプロセス間でのデータ共有など、各プロセスのよりきめ細かい制御を実現できます。concurrent.futures.ProcessPoolExecutor
は、単純なタスクの並列化に適しており、多くの基盤となる詳細を隠し、マルチプロセスのコードを簡単に記述できるようにします。 - クロスプラットフォームサポート:
concurrent.futures.ProcessPoolExecutor
とmultiprocessing
の両方がクロスプラットフォームのマルチプロセスサポートを提供し、さまざまなオペレーティングシステムで使用できます。
要するに、concurrent.futures.ProcessPoolExecutor
は、基盤となるマルチプロセス関数をカプセル化する高レベルのインターフェースであり、単純なマルチプロセス タスク並列化に適しています。multiprocessing
は、より低レベルのライブラリであり、より多くの制御と柔軟性を提供し、プロセスのきめ細かい制御が必要なシナリオに適しています。特定の要件に応じて適切なライブラリを選択する必要があります。単純なタスク並列化の場合は、concurrent.futures.ProcessPoolExecutor
を使用してコードを簡略化できます。より低レベルの制御と通信が必要な場合は、multiprocessing
ライブラリを使用できます。
まとめ
マルチスレッドとは異なり、asyncio
はシングルスレッドですが、その内部イベントループのメカニズムにより、複数の異なるタスクを同時に実行でき、マルチスレッドよりも自律的な制御が可能です。
asyncio
のタスクは、動作中に中断されることはないため、競合状態は発生しません。
特にI/O操作が多いシナリオでは、asyncio
はマルチスレッドよりも高い動作効率を発揮します。これは、asyncio
でのタスク切り替えのコストがスレッド切り替えのコストよりもはるかに小さく、asyncio
が開始できるタスクの数がマルチスレッドのスレッド数よりもはるかに多いためです。
ただし、多くの場合、asyncio
を使用するには、前の例のaiohttp
など、特定のサードパーティライブラリのサポートが必要であることに注意してください。また、I/O操作が高速で重くない場合は、マルチスレッドを使用しても問題を効果的に解決できます。
asyncio
は、非同期プログラミングを実装するためのPythonライブラリです。- コルーチンは
asyncio
の中核となる概念であり、async
キーワードとawait
キーワードを介して非同期操作を実現します。 asyncio
は、非同期I/O操作のための強力なAPIを提供し、I/O集中型のタスクを簡単に処理できます。asyncio.gather()
などのメカニズムを介して、複数のコルーチンタスクを並行して実行できます。
Leapcell: FastAPI、Flask、その他のPythonアプリケーションに最適なプラットフォーム
最後に、Flask/FastAPIのデプロイに最適なプラットフォームLeapcellを紹介させてください。
Leapcellは、最新の分散アプリケーション向けに特別に設計されたクラウドコンピューティングプラットフォームです。従量課金制の価格モデルにより、アイドルコストが発生しないため、ユーザーは実際に使用するリソースに対してのみ支払います。
- 多言語サポート
- JavaScript、Python、Go、またはRustでの開発をサポートします。
- 無制限のプロジェクトの無料デプロイメント
- 使用量に基づいてのみ課金されます。リクエストがない場合は課金されません。
- 比類のない費用対効果
- 従量課金制で、アイドル料金はかかりません。
- たとえば、$25で694万件のリクエストをサポートでき、平均応答時間は60ミリ秒です。
- 簡素化された開発者エクスペリエンス
- セットアップが簡単な直感的なユーザーインターフェース。
- 完全に自動化されたCI/CDパイプラインとGitOps統合。
- リアルタイムのメトリックとログにより、実用的な洞察が得られます。
- 簡単なスケーラビリティと高性能
- 高い同時実行性を容易に処理するための自動スケーリング。
- 運用 overhead がゼロであるため、開発者は開発に集中できます。
ドキュメントで詳細をご覧ください。 Leapcell Twitter: https://x.com/LeapcellHQ