【Python】非同期処理(asyncio, aiohttp)を理解する

【Python】非同期処理(asyncio, aiohttp)を理解する

1. Pythonの非同期処理とは

Pythonの非同期処理とは、時間のかかる処理を待っている間に、他の処理を先に進めるプログラミングの方法です。待ち時間がある処理を効率よく進めるときに特に効果を発揮します。

たとえば、次のような場合に、非同期処理を使うと効果を発揮します。

(1) Web APIの並列呼び出し

例:天気情報APIや路線情報API等に同時にリクエストを送りたいとき。
・APIの応答を待っている間は、CPUをほとんど使わないので、他のリクエスト処理ができる。
・非同期でリクエストするときは、requestsの代わりにaiohttpを使う。

(2) チャットアプリやWebSocket通信(リアルタイム処理)

例:Webチャット、オンラインゲームなどで、常時接続しながらメッセージの送受信を行う。
・接続は維持しつつ、メッセージが来るのを「待つ」必要があり、同期処理だと待っている間、他のことができなくなる。

(3) ファイルやデータベースへの非同期アクセス

例:数千件のログファイルを読み込みながら、バックグラウンドで解析・保存する。
・ファイルI/OはCPUよりディスクの速度がボトルネックになる。
aiofilesdatabasesライブラリを使うことで、I/O中に他の処理を進められる。

(4) Webクローラー(多数のページを巡回)

例:ニュースサイトや商品ページを一括で収集するスクリプト。
・ページごとにHTTPリクエストが必要で、1つずつ順番に処理していたら時間がかかる。
・非同期なら同時に多数のページへアクセスできる。

(5) GUIアプリやCLIツールの操作中バックグラウンド処理

例:ファイルコピー中に進行状況バーを表示したり、ユーザーの操作を受け付ける。
・ユーザー操作とバックグランド作業を同時に進めることができる。
・asyncioを使えば、UIイベントループを止めずに処理ができる。

2. Pythonで非同期処理を書いてみる

コード

import asyncio

async def greet(name):
    print(f"{name} に挨拶中...")
    await asyncio.sleep(1)
    print(f"{name} に挨拶完了!")

async def main():
    # 2つの処理を同時に実行
    await asyncio.gather(
        greet("Alice"),
        greet("Bob")
    )

asyncio.run(main())
  • async defで非同期関数(コルーチン)を定義
  • await asyncio.sleep(1)は、「他の処理に制御を渡しながら1秒待つ」
  • asyncio.gather()は複数の非同期関数を同時に実行

実行結果

$ python test.py 
Alice に挨拶中... 
# このメッセージ表示後、await asyncio.sleep(1)が実行されて1秒待つ。awaitが制御を手放すので、その間にgreet("Bob")が実行
Bob に挨拶中... 
# このメッセージ表示後、await asyncio.sleep(1)が実行されて1秒待つ。awaitが制御を手放す
Alice に挨拶完了! 
# 「Alice に挨拶中...」 表示後、1秒後に続きの処理が実行
Bob に挨拶完了! 
# 「Bob に挨拶中...」表示後、1秒後に続きの処理が実行

このように、awaitは「他のタスクにバトンを渡す」という意味合いがあります。

3. asyncio.gather()とasyncio.create_task()の使い方

複数の非同期処理を同時に進めるツールには、asyncio.gather()asyncio.create_task()があります。

(1) asyncio.gather()

概要

複数の非同期関数を同時に実行して、すべて終わるのを待つ

構文

await asyncio.gather(task(1), task(2), ...)

コード例

import asyncio

async def hello(name):
    print(f"{name} に挨拶中…")
    await asyncio.sleep(1)
    print(f"{name} に挨拶完了!")

async def main():
    await asyncio.gather(
        hello("Alice"),
        hello("Bob"),
        hello("Charlie")
    )

asyncio.run(main())

実行結果

Alice に挨拶中…
Bob に挨拶中…
Charlie に挨拶中…
Alice に挨拶完了!
Bob に挨拶完了!
Charlie に挨拶完了!

ほぼ同時に進行して終了します。

(2) asyncio.create_task()

概要

非同期関数の「実行タスクを作って、あとでawaitで回収」する

構文

task = asyncio.create_task(func())
...
await task

コード例

import asyncio

async def hello(name):
    print(f"{name} に挨拶中…")
    await asyncio.sleep(1)
    print(f"{name} に挨拶完了!")

async def main():
    task1 = asyncio.create_task(hello("Alice"))
    task2 = asyncio.create_task(hello("Bob"))

    print("タスクを開始しました")
    await task1
    await task2

asyncio.run(main())

実行結果

$ python test.py 
タスクを開始しました
Alice に挨拶中…
Bob に挨拶中…
Alice に挨拶完了!
Bob に挨拶完了!
  • create_task()で、コルーチンであるhello("Alice")hello("Bob")を「タスク」としてイベントループに登録する。この時点では、まだhello("Alice")の本体はまだ実行されていない
  • await task1,await task2で初めてイベントループがtask1とtask2を動かし始める

(3) gather()とcreate_task()の違いまとめ

比較点 gather() create_task()
処理開始 すぐに開始 すぐに開始
await 必須 その場でまとめて await 後から await できる
戻り値 結果をリストで返す タスクオブジェクトを返す
例外処理 全部の例外をまとめて扱う 個別のタスクごとに扱える
向いている用途 一括処理 タスクの生存管理・個別制御

4. 簡単な非同期処理プログラム

さて、ここでは「1. Pythonの非同期処理とは」の使用例としてご紹介した「(1) Web APIの並列呼び出し」を試してみたいと思います。

ライブラリaiohttpを使用するので、事前にインストールしておきます。

pip install aiohttp

コード全文

import asyncio
import aiohttp
import re

def sanitize_name(name):
    # 正規表現で英字のみ(a-zA-Z)以外は拒否
    if re.fullmatch(r"[a-zA-Z]+", name):
        return name
    else:
        raise ValueError("名前には英字のみを使用してください。")
    
# 1つのURLに対して非同期でGETリクエストを送る関数
async def fetch(session, url):
    async with session.get(url) as response:
        data = await response.text()
        print(f"取得完了: {url}")
        return data

# メイン関数:複数のURLを並列で取得
async def main(name):
    urls = [
        f"https://api.agify.io?name={name}",
        f"https://api.genderize.io?name={name}",
        f"https://api.nationalize.io?name={name}"
    ]

    async with aiohttp.ClientSession() as session:
        # fetch()を並列に実行
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

    # 結果表示
    for result in results:
        print(result)

# 実行
if __name__ == "__main__":
    try:
        user_input = input("名前を英字で入力してください(例: michael):").strip()
        safe_name = sanitize_name(user_input)  # バリデーション
        asyncio.run(main(safe_name))
    except ValueError as ve:
        print(f"入力エラー: {ve}")
    except Exception as e:
        print(f"予期せぬエラーが発生しました: {e}")    

実行例

$ python test.py 
名前を英字で入力してください(例: michael):Connan
取得完了: https://api.genderize.io?name=Connan
取得完了: https://api.nationalize.io?name=Connan
取得完了: https://api.agify.io?name=Connan
{"count":54,"name":"Connan","age":61}
{"count":257,"name":"Connan","gender":"male","probability":0.94}
{"count":1531,"name":"Connan","country":[{"country_id":"FR","probability":0.4543036992858752},{"country_id":"NA","probability":0.036055849149672636},{"country_id":"AU","probability":0.03192209092824059},{"country_id":"ZA","probability":0.022353954332983168},{"country_id":"JP","probability":0.020869054060425923}]}
  • aiohttp.ClientSession() :セッションを共有することで効率的なHTTP通信を行う
  • async with:接続の開始・終了を非同期に管理
  • *tasks:リストを展開(アンパック)して、それぞれの要素を個別の引数として関数に渡す。
    つまり、
    await asyncio.gather( ["https://api.agify.io?name={name}", "https://api.genderize.io?name={name}", "https://api.nationalize.io?name={name}"])
    のように1つのリストを渡すのではなく、
    await asyncio.gather("https://api.agify.io?name={name}", "https://api.genderize.io?name={name}", "https://api.nationalize.io?name={name}")
    のように3つの引数を渡す

今回はPythonの非同期関数について取り上げてみました。