コンテンツにスキップ

高度な使い方

このガイドでは、高度な非同期アプリケーションを構築するためのmtaioの高度な機能とパターンについて説明します。

高度なイベントパターン

イベントパイプライン

イベントとデータパイプラインを組み合わせて複雑なワークフローを作成:

from mtaio.events import EventEmitter
from mtaio.data import Pipeline, Stage

class ValidationStage(Stage[dict, dict]):
    async def process(self, data: dict) -> dict:
        if "user_id" not in data:
            raise ValueError("user_idが見つかりません")
        return data

class EnrichmentStage(Stage[dict, dict]):
    async def process(self, data: dict) -> dict:
        data["timestamp"] = time.time()
        return data

async def setup_event_pipeline():
    pipeline = Pipeline()
    emitter = EventEmitter()

    # パイプラインの設定
    pipeline.add_stage(ValidationStage())
    pipeline.add_stage(EnrichmentStage())

    @emitter.on("user_action")
    async def handle_user_action(event):
        async with pipeline:
            processed_data = await pipeline.process(event.data)
            await emitter.emit("processed_action", processed_data)

    return emitter

イベントのフィルタリングと変換

from mtaio.events import EventEmitter, Event

async def setup_event_processing():
    emitter = EventEmitter()

    # イベントのフィルタリング
    filtered = emitter.filter(
        lambda event: event.data.get("priority") == "high"
    )

    # イベントの変換
    transformed = emitter.map(
        lambda event: Event(
            event.name,
            {**event.data, "processed": True}
        )
    )

    # 複数のエミッターを連鎖
    emitter.pipe(filtered)
    filtered.pipe(transformed)

高度なキャッシュ戦略

キャッシュの階層化

最適なパフォーマンスのためのマルチレベルキャッシュの実装:

from mtaio.cache import TTLCache, DistributedCache
from typing import Optional, TypeVar, Generic

T = TypeVar("T")

class LayeredCache(Generic[T]):
    def __init__(self):
        self.local = TTLCache[T](default_ttl=60.0)  # 1分のローカルキャッシュ
        self.distributed = DistributedCache[T]([
            ("localhost", 5000),
            ("localhost", 5001)
        ])

    async def get(self, key: str) -> Optional[T]:
        # まずローカルキャッシュを試行
        value = await self.local.get(key)
        if value is not None:
            return value

        # 分散キャッシュを試行
        value = await self.distributed.get(key)
        if value is not None:
            # ローカルキャッシュを更新
            await self.local.set(key, value)
            return value

        return None

    async def set(self, key: str, value: T) -> None:
        # 両方のキャッシュを更新
        await self.local.set(key, value)
        await self.distributed.set(key, value)

キャッシュ無効化パターン

from mtaio.cache import TTLCache
from mtaio.events import EventEmitter

class CacheInvalidator:
    def __init__(self):
        self.cache = TTLCache[str]()
        self.emitter = EventEmitter()

        @self.emitter.on("data_updated")
        async def invalidate_cache(event):
            keys = event.data.get("affected_keys", [])
            for key in keys:
                await self.cache.delete(key)

            if event.data.get("clear_all", False):
                await self.cache.clear()

    async def update_data(self, key: str, value: str) -> None:
        await self.cache.set(key, value)
        await self.emitter.emit("data_updated", {
            "affected_keys": [key]
        })

高度なリソース管理

カスタムリソースリミッター

特定のニーズに合わせたリソースリミッターの作成:

from mtaio.resources import ResourceLimiter
from mtaio.typing import AsyncFunc
from typing import Dict

class AdaptiveRateLimiter(ResourceLimiter):
    def __init__(self):
        self.rates: Dict[str, float] = {}
        self._current_load = 0.0

    async def acquire(self, resource_id: str) -> None:
        rate = self.rates.get(resource_id, 1.0)
        if self._current_load > 0.8:  # 80%の負荷
            rate *= 0.5  # レートを削減
        await super().acquire(tokens=1/rate)

    def adjust_rate(self, resource_id: str, load: float) -> None:
        self._current_load = load
        if load > 0.9:  # 高負荷
            self.rates[resource_id] *= 0.8
        elif load < 0.5:  # 低負荷
            self.rates[resource_id] *= 1.2

複雑なタイムアウトパターン

from mtaio.resources import TimeoutManager
from contextlib import asynccontextmanager

class TimeoutController:
    def __init__(self):
        self.timeouts = TimeoutManager()

    @asynccontextmanager
    async def cascading_timeout(self, timeouts: list[float]):
        """段階的なタイムアウトとフォールバック動作を実装"""
        for timeout in timeouts:
            try:
                async with self.timeouts.timeout(timeout):
                    yield
                break
            except TimeoutError:
                if timeout == timeouts[-1]:
                    raise
                continue

高度なデータ処理

カスタムパイプラインステージ

複雑なデータ変換のための特殊なパイプラインステージの作成:

from mtaio.data import Pipeline, Stage
from typing import Any, AsyncIterator

class BatchProcessingStage(Stage[Any, Any]):
    def __init__(self, batch_size: int):
        self.batch_size = batch_size
        self.batch = []

    async def process(self, item: Any) -> AsyncIterator[Any]:
        self.batch.append(item)

        if len(self.batch) >= self.batch_size:
            result = await self._process_batch(self.batch)
            self.batch = []
            return result

    async def _process_batch(self, batch: list[Any]) -> Any:
        # バッチ処理ロジックの実装
        return batch

ストリーム処理

複雑なストリーム処理パターンの実装:

from mtaio.data import Stream
from typing import TypeVar, AsyncIterator

T = TypeVar("T")

class StreamProcessor(Stream[T]):
    async def window(
        self,
        size: int,
        slide: int = 1
    ) -> AsyncIterator[list[T]]:
        """スライディングウィンドウの実装"""
        buffer: list[T] = []

        async for item in self:
            buffer.append(item)
            if len(buffer) >= size:
                yield buffer[-size:]
                buffer = buffer[slide:]

    async def batch_by_time(
        self,
        seconds: float
    ) -> AsyncIterator[list[T]]:
        """時間ベースのバッチ処理"""
        batch: list[T] = []
        start_time = time.monotonic()

        async for item in self:
            batch.append(item)
            if time.monotonic() - start_time >= seconds:
                yield batch
                batch = []
                start_time = time.monotonic()

高度なモニタリング

カスタムメトリクス収集

from mtaio.monitoring import ResourceMonitor
from dataclasses import dataclass

@dataclass
class CustomMetrics:
    request_count: int = 0
    error_count: int = 0
    average_response_time: float = 0.0

class ApplicationMonitor(ResourceMonitor):
    def __init__(self):
        super().__init__()
        self.metrics = CustomMetrics()

    async def collect_metrics(self) -> None:
        while True:
            stats = await self.get_current_stats()

            # カスタムメトリクスの更新
            self.metrics.average_response_time = (
                stats.latency_sum / stats.request_count
                if stats.request_count > 0 else 0.0
            )

            # 必要に応じてアラートを発行
            if self.metrics.error_count > 100:
                await self.alert("高いエラー率が検出されました")

            await asyncio.sleep(60)  # 1分ごとに収集

本番環境のベストプラクティス

エラーリカバリー

from mtaio.core import TaskExecutor
from mtaio.exceptions import MTAIOError

class ResilientExecutor:
    def __init__(self):
        self.executor = TaskExecutor()
        self.retry_count = 3

    async def execute_with_recovery(
        self,
        func: AsyncFunc[T],
        *args: Any,
        **kwargs: Any
    ) -> T:
        for attempt in range(self.retry_count):
            try:
                return await self.executor.run(
                    func(*args, **kwargs)
                )
            except MTAIOError as e:
                if attempt == self.retry_count - 1:
                    raise
                await asyncio.sleep(2 ** attempt)  # 指数バックオフ

リソースのクリーンアップ

from mtaio.resources import ResourceManager
from typing import AsyncIterator

class ManagedResources:
    def __init__(self):
        self.resources: list[AsyncCloseable] = []

    async def acquire(self, resource: AsyncCloseable) -> None:
        self.resources.append(resource)

    async def cleanup(self) -> None:
        while self.resources:
            resource = self.resources.pop()
            await resource.close()

    @asynccontextmanager
    async def resource_scope(self) -> AsyncIterator[None]:
        try:
            yield
        finally:
            await self.cleanup()

次のステップ