Pythonタスクに適した並行処理モデルの選択
Daniel Hayes
Full-Stack Engineer · Leapcell

はじめに
ソフトウェア開発の世界では、応答性と効率性が最優先されます。Webサーバーの構築、大規模データセットの処理、インターネットからの情報スクレイピングなど、アプリケーションが複数の操作を並行して処理できる能力は、そのパフォーマンスとユーザーエクスペリエンスに大きく影響します。Pythonは、その豊富なエコシステムにより、multiprocessing
、threading
、asyncio
といった強力な並行処理モデルをいくつか提供しています。それぞれのニュアンス、そしてより重要なことには、いつどれを選択するかを知ることは、高性能なアプリケーションを書きたいPython開発者にとって不可欠なスキルです。この記事では、これらの並行処理モデルを解き明かし、その原則をガイドし、皆様が特定のユースケースに対して情報に基づいた意思決定を行えるよう支援します。
並行処理のコアコンセプト
各モデルの詳細に入る前に、Pythonにおける並行処理の基盤となるいくつかの基本的な概念を明確に理解しましょう。
並行処理 vs. 並列処理: 並行処理は一度に多くのことを「扱う」ことであり、並列処理は一度に多くのことを「実行する」ことです。シングルコアCPUは、タスク間で速く切り替える(コンテキストスイッチ)ことによって並行処理が可能になり、同時に実行されている「幻想」を与えます。一方、並列処理は、真に同時にタスクを実行するために複数の処理ユニット(CPUコア)を必要とします。
CPUバウンドタスク vs. I/Oバウンドタスク:
- CPUバウンドタスクは、計算処理にほとんどの時間を費やす操作であり、CPUの速度によって制限されます。例としては、重い数学的計算、画像処理、データ圧縮などが挙げられます。
- I/Oバウンドタスクは、ネットワークリクエスト、ディスクの読み書き、データベースクエリなど、外部リソースの応答を待つことにほとんどの時間を費やす操作です。この待機時間中、CPUはほとんどアイドル状態になります。
グローバルインタプリタロック(GIL): GILは、Pythonオブジェクトへのアクセスを保護するミューテックスであり、複数のネイティブスレッドが同時にPythonバイトコードを実行するのを防ぎます。これは、マルチコアプロセッサ上であっても、一度に1つのスレッドしかPythonバイトコードを実行できないことを意味します。GILはC拡張機能の開発とメモリ管理を簡素化しますが、単一のPythonプロセス内でのCPUバウンドタスクの真の並列実行を制限します。
スレッディング:共有メモリによる並行処理
threading
を使用すると、プログラムの複数の部分を同じプロセス内で並行して実行できます。スレッドは同じメモリ空間を共有するため、データ共有が簡単になりますが、適切に管理されない場合は競合状態やデッドロックといった課題も生じさせます。
仕組み
新しいスレッドを作成すると、それはメインスレッドと並行して別の関数を実行します。これらのスレッドのスケジューリングはオペレーティングシステムが管理します。
例
複数のURLからデータを取得するようなI/Oバウンドタスクを考えてみましょう。
import threading import requests import time def fetch_url(url): print(f"Starting to fetch {url}") try: response = requests.get(url, timeout=5) print(f"Finished fetching {url}: Status {response.status_code}") except requests.exceptions.RequestException as e: print(f"Error fetching {url}: {e}") urls = [ "https://www.google.com", "https://www.bing.com", "https://www.yahoo.com", "https://www.amazon.com", "https://www.wikipedia.org" ] start_time = time.time() threads = [] for url in urls: thread = threading.Thread(target=fetch_url, args=(url,))) threads.append(thread) thread.start()) for thread in threads: thread.join() # すべてのスレッドが完了するまで待機 end_time = time.time() print(f"All URLs fetched in {end_time - start_time:.2f} seconds using threading.")
スレッディングを使用するタイミング
threading
はI/Oバウンドタスクに最適です。GILは真のマルチコアCPU並列実行を妨げますが、スレッドがI/O操作(例:ネットワークデータの待機)を実行すると、GILが解放され、他のスレッドが実行できるようになります。これにより、外部リソースの待機を伴うタスクにthreading
が効果的になります。
逆に、CPUバウンドタスクの場合、GILのためにthreading
はほとんど、あるいは全くパフォーマンス上の利点をもたらさず、コンテキストスイッチのオーバーヘッドを導入して、プログラムをシングルスレッドアプローチよりも遅くする可能性さえあります。
マルチプロセッシング:分離されたプロセスによる真の並列性
multiprocessing
を使用すると、それぞれ独自のPythonインタプリタとメモリ空間を持つ新しいプロセスを生成できます。これは、GILが問題にならないことを意味し、複数のCPUコアにわたるCPUバウンドタスクの真の並列実行を可能にします。
仕組み
multiprocessing
を使用すると、新しいOSプロセスが作成されます。これらのプロセスはメモリを直接共有しないため、GILの制約を回避します。プロセス間の通信は、通常、パイプやキューなどの明示的なメカニズムを介して行われます。
例
multiprocessing
を実証するために、素数計算のようなCPUバウンドタスクを見てみましょう。
import multiprocessing import time def is_prime(n): if n < 2: return False for i in range(2, int(n**0.5) + 1): if n % i == 0: return False return True def find_primes_in_range(start, end): primes = [n for n in range(start, end) if is_prime(n)] # print(f"Found {len(primes)} primes between {start} and {end}") return primes if __name__ == "__main__": nums_to_check = range(1000000, 10000000) # より良いデモンストレーションのためのより大きな範囲 num_processes = multiprocessing.cpu_count() # CPUコア数と同じ数のプロセスを使用 chunk_size = len(nums_to_check) // num_processes chunks = [] for i in range(num_processes): start_idx = i * chunk_size end_idx = (i + 1) * chunk_size if i < num_processes - 1 else len(nums_to_check) chunks.append((nums_to_check[start_idx], nums_to_check[end_idx-1] + 1)) start_time = time.time() with multiprocessing.Pool(num_processes) as pool: all_primes = pool.starmap(find_primes_in_range, chunks) # リストのリストをフラット化 total_primes = [item for sublist in all_primes for item in sublist] end_time = time.time() print(f"Found {len(total_primes)} primes in {end_time - start_time:.2f} seconds using multiprocessing.") # 比較のため、シングルスレッド実行(実行するにはコメントを解除) # start_time_single = time.time() # single_primes = find_primes_in_range(nums_to_check[0], nums_to_check[-1] + 1) # end_time_single = time.time() # print(f"Found {len(single_primes)} primes in {end_time_single - start_time_single:.2f} seconds using single-thread.")
マルチプロセッシングを使用するタイミング
multiprocessing
はCPUバウンドタスクの決定版です。複数のCPUコアを活用することで、GILの制約を克服し、計算集約的な操作の真の並列実行を実現し、大幅な速度向上をもたらします。
I/Oバウンドタスクにも使用できますが、プロセスの作成と管理のオーバーヘッドは通常スレッドよりも高いため、そのようなシナリオではthreading
またはasyncio
の方が効率的な場合がよくあります。
Asyncio:高並行処理のための協調的マルチタスク
asyncio
は、async
/await
構文を使用して並行コードを記述するためのPythonのライブラリです。これは、単一スレッドを使用した「協調的マルチタスク」を可能にし、タスクはイベントループに自発的に制御を譲り、他のタスクの実行を許可します。これは、多数の同時I/O操作を効率的に処理するのに特に強力です。
仕組み
asyncio
はイベントループ上で動作します。await
式(通常はI/O操作)に遭遇すると、現在のタスクは一時停止し、制御はイベントループに戻ります。イベントループは、準備ができた他のタスクや外部イベント(ネットワーク応答など)をチェックし、それらをスケジュールします。待機中のI/O操作が完了すると、元のタスクが再開されます。
例
URL取得の例をasyncio
を使用して再度見てみましょう。
import asyncio import aiohttp # 非同期HTTPクライアント import time async def fetch_url_async(url, session): print(f"Starting to fetch {url}") try: async with session.get(url, timeout=5) as response: status = response.status print(f"Finished fetching {url}: Status {status}") return status except aiohttp.ClientError as e: print(f"Error fetching {url}: {e}") return None async def main(): urls = [ "https://www.google.com", "https://www.bing.com", "https://www.yahoo.com", "https://www.amazon.com", "https://www.wikipedia.org", "https://www.example.com", # より良いデモンストレーションのために追加 "https://www.test.org" ] start_time = time.time() async with aiohttp.ClientSession() as session: tasks = [fetch_url_async(url, session) for url in urls] results = await asyncio.gather(*tasks) # タスクを並列実行 end_time = time.time() print(f"All URLs fetched in {end_time - start_time:.2f} seconds using asyncio.") # print(f"Results: {results}") if __name__ == "__main__": asyncio.run(main())
Asyncioを使用するタイミング
asyncio
は、多数のスレッドやプロセスの作成オーバーヘッドなしに、非常に多数の同時接続または操作を管理する必要があるI/Oバウンドタスクに優れています。単一スレッド内で動作するため、コンテキストスイッチはスレッドよりもはるかに軽量であり、I/OへのGIL問題の影響を回避します。Webサーバー、データベースプロキシ、またはロングポーリングクライアントを考えてみてください。
CPUバウンドタスクには一般的に適していません。これは、単一のCPU集約的なタスクがイベントループ全体をブロックし、完了するまで他のすべての協調タスクの実行を防ぐためです。asyncio
アプリケーションでCPUバウンド操作を行う場合、通常はイベントループをブロックしないようにmultiprocessing.Pool
またはThreadPoolExecutor
にオフロードします。
適切なモデルの選択
簡単な概要と決定フレームワークを以下に示します。
- CPUバウンドタスク: **
multiprocessing
**を使用します。GILをバイパスし、計算集約的な操作の複数のコアにわたる真の並列実行を可能にします。 - I/Oバウンドタスク:
- 適度な数の同時操作、または非同期相当のものを持たないブロッキングI/Oライブラリを扱う場合、**
threading
**は良い選択です。多くの従来のI/Oシナリオでは、asyncio
よりも実装が簡単です。 - 非常に多数の同時I/O操作、特にネットワーク呼び出し、および非同期ライブラリ(
aiohttp
、asyncpg
など)を使用する場合、asyncio
は協調的マルチタスクとより低いオーバーヘッドにより、はるかに効率的です。
- 適度な数の同時操作、または非同期相当のものを持たないブロッキングI/Oライブラリを扱う場合、**
- 混合タスク(CPUバウンドおよびI/Oバウンド): 多くの場合、ハイブリッドアプローチが最適です。I/Oバウンド部分には
asyncio
を使用し、CPUバウンド計算をmultiprocessing.Pool
(asyncio
コンテキストでloop.run_in_executor
を使用)にオフロードして、イベントループをブロックしないようにします。
結論
Pythonは、並行アプリケーションを構築するための強力なツールを提供しており、それぞれに強みと理想的なユースケースがあります。Threading
はI/Oバウンドタスクと適度な並行処理に適しており、multiprocessing
は真の並列性を要求するCPUバウンドタスクのチャンピオンであり、asyncio
は非常に並行なI/Oバウンド操作にエレガントで効率的なソリューションを提供します。これらの区別を理解することで、開発者は最も適切な並行処理モデルを自信を持って選択し、Pythonアプリケーションが応答性があり、かつパフォーマンスを発揮することを保証できます。鍵は、並行処理モデルをタスクの性質に合わせることです。