Web APIへのリクエスト、データベースクエリ、ファイルI/O—これらの操作は待ち時間が発生します。asyncioを使えば、その待ち時間を有効活用し、複数の操作を効率的に並行処理できます。
なぜasyncioなのか
同期処理の問題
import time
def fetch_data(url: str) -> str:
time.sleep(1) # ネットワーク待ち時間をシミュレート
return f"Data from {url}"
# 3つのURLを順次取得 = 3秒かかる
urls = ["url1", "url2", "url3"]
for url in urls:
print(fetch_data(url))
非同期処理の利点
import asyncio
async def fetch_data(url: str) -> str:
await asyncio.sleep(1) # 非同期で待機
return f"Data from {url}"
async def main():
urls = ["url1", "url2", "url3"]
# 3つを並行実行 = 約1秒で完了
results = await asyncio.gather(
*[fetch_data(url) for url in urls]
)
for result in results:
print(result)
asyncio.run(main())
コルーチンの基本
async/await
import asyncio
# async defでコルーチン関数を定義
async def greet(name: str) -> str:
# awaitで非同期操作を待機
await asyncio.sleep(0.5)
return f"Hello, {name}!"
# コルーチンを実行
async def main():
result = await greet("Python")
print(result)
# イベントループで実行
asyncio.run(main())
コルーチンオブジェクト
async def example():
return 42
# コルーチン関数を呼び出すとコルーチンオブジェクトを返す
coro = example()
print(type(coro)) # <class 'coroutine'>
# awaitで結果を取得
async def main():
result = await example()
print(result) # 42
Task: 並行実行の単位
Taskの作成
import asyncio
async def long_operation(name: str, seconds: int) -> str:
print(f"{name} started")
await asyncio.sleep(seconds)
print(f"{name} finished")
return f"{name} result"
async def main():
# Taskを作成(即座にスケジュール)
task1 = asyncio.create_task(long_operation("Task1", 2))
task2 = asyncio.create_task(long_operation("Task2", 1))
# 両方の完了を待つ
result1 = await task1
result2 = await task2
print(f"Results: {result1}, {result2}")
asyncio.run(main())
# 出力:
# Task1 started
# Task2 started
# Task2 finished
# Task1 finished
# Results: Task1 result, Task2 result
Taskのキャンセル
async def cancellable_task():
try:
while True:
print("Working...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Task was cancelled")
raise # 再送出して正しくキャンセル
async def main():
task = asyncio.create_task(cancellable_task())
await asyncio.sleep(3) # 3秒待機
task.cancel() # キャンセル
try:
await task
except asyncio.CancelledError:
print("Caught cancellation")
asyncio.run(main())
gather: 複数コルーチンの集約
import asyncio
async def fetch(url: str, delay: float) -> dict:
await asyncio.sleep(delay)
return {"url": url, "status": 200}
async def main():
# 複数のコルーチンを並行実行
results = await asyncio.gather(
fetch("api/users", 1.0),
fetch("api/posts", 0.5),
fetch("api/comments", 0.8),
)
for result in results:
print(result)
asyncio.run(main())
エラーハンドリング
async def might_fail(should_fail: bool):
await asyncio.sleep(0.1)
if should_fail:
raise ValueError("Something went wrong")
return "Success"
async def main():
# return_exceptions=Trueで例外も結果として返す
results = await asyncio.gather(
might_fail(False),
might_fail(True),
might_fail(False),
return_exceptions=True
)
for result in results:
if isinstance(result, Exception):
print(f"Error: {result}")
else:
print(f"Result: {result}")
asyncio.run(main())
asyncio.wait: より細かい制御
import asyncio
async def task(name: str, delay: float):
await asyncio.sleep(delay)
return f"{name} done"
async def main():
tasks = [
asyncio.create_task(task("A", 1)),
asyncio.create_task(task("B", 2)),
asyncio.create_task(task("C", 0.5)),
]
# 最初の1つが完了するまで待機
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
for t in done:
print(f"Completed: {t.result()}")
print(f"Still pending: {len(pending)}")
# 残りも待機
done2, _ = await asyncio.wait(pending)
for t in done2:
print(f"Completed: {t.result()}")
asyncio.run(main())
タイムアウト
wait_for
async def slow_operation():
await asyncio.sleep(10)
return "Done"
async def main():
try:
result = await asyncio.wait_for(
slow_operation(),
timeout=2.0
)
except asyncio.TimeoutError:
print("Operation timed out!")
asyncio.run(main())
timeout(Python 3.11+)
async def main():
async with asyncio.timeout(2.0):
await slow_operation()
非同期コンテキストマネージャ
class AsyncResource:
async def __aenter__(self):
print("Acquiring resource")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Releasing resource")
await asyncio.sleep(0.1)
async def main():
async with AsyncResource() as resource:
print("Using resource")
asyncio.run(main())
非同期イテレータ
class AsyncCounter:
def __init__(self, max_count: int):
self.max_count = max_count
self.count = 0
def __aiter__(self):
return self
async def __anext__(self) -> int:
if self.count >= self.max_count:
raise StopAsyncIteration
await asyncio.sleep(0.1)
self.count += 1
return self.count
async def main():
async for num in AsyncCounter(5):
print(num)
asyncio.run(main())
非同期ジェネレータ
async def async_range(start: int, stop: int):
for i in range(start, stop):
await asyncio.sleep(0.1)
yield i
async def main():
async for num in async_range(1, 5):
print(num)
asyncio.run(main())
セマフォによる並行数制限
import asyncio
async def fetch_with_limit(
semaphore: asyncio.Semaphore,
url: str
) -> str:
async with semaphore:
print(f"Fetching {url}")
await asyncio.sleep(1)
return f"Result from {url}"
async def main():
# 同時実行数を3に制限
semaphore = asyncio.Semaphore(3)
urls = [f"url{i}" for i in range(10)]
tasks = [
fetch_with_limit(semaphore, url)
for url in urls
]
results = await asyncio.gather(*tasks)
print(f"Fetched {len(results)} URLs")
asyncio.run(main())
実践パターン
HTTP並行リクエスト
import asyncio
import aiohttp
async def fetch_url(
session: aiohttp.ClientSession,
url: str
) -> dict:
async with session.get(url) as response:
return {
"url": url,
"status": response.status,
"data": await response.text()
}
async def main():
urls = [
"https://api.example.com/users",
"https://api.example.com/posts",
"https://api.example.com/comments",
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
print(f"{result['url']}: {result['status']}")
asyncio.run(main())
プロデューサー・コンシューマー
import asyncio
async def producer(queue: asyncio.Queue, n: int):
for i in range(n):
await asyncio.sleep(0.1)
await queue.put(f"item-{i}")
print(f"Produced: item-{i}")
await queue.put(None) # 終了シグナル
async def consumer(queue: asyncio.Queue, name: str):
while True:
item = await queue.get()
if item is None:
await queue.put(None) # 他のコンシューマーに伝播
break
print(f"{name} consumed: {item}")
await asyncio.sleep(0.2)
async def main():
queue = asyncio.Queue()
await asyncio.gather(
producer(queue, 10),
consumer(queue, "Consumer-1"),
consumer(queue, "Consumer-2"),
)
asyncio.run(main())
まとめ
asyncioの主要コンポーネント:
flowchart TB
subgraph Core["コア概念"]
C1["async/await"]
C2["コルーチン"]
C3["イベントループ"]
end
subgraph Execution["実行制御"]
E1["Task"]
E2["gather"]
E3["wait"]
end
subgraph Patterns["パターン"]
P1["Semaphore"]
P2["Queue"]
P3["timeout"]
end
Core --> Execution --> Patterns
style Core fill:#3b82f6,color:#fff
style Execution fill:#8b5cf6,color:#fff
style Patterns fill:#22c55e,color:#fff
| 関数/クラス | 用途 |
|---|---|
asyncio.run() |
メインエントリーポイント |
asyncio.create_task() |
Task作成・スケジュール |
asyncio.gather() |
複数コルーチンを並行実行 |
asyncio.wait() |
完了条件を細かく制御 |
asyncio.Semaphore |
並行数を制限 |
asyncio.Queue |
非同期キュー |
主要な原則:
- I/Oバウンドに使用: CPU負荷の高い処理には向かない
- awaitを忘れない: コルーチンはawaitしないと実行されない
- Taskで並行化: 並行実行したい場合はTaskを作成
- 適切な並行数: セマフォで同時実行数を制限
asyncioは、I/O待ちの多いアプリケーションのスループットを劇的に向上させます。