Advanced Usage¶
This guide covers advanced features and patterns in mtaio for building sophisticated asynchronous applications.
Advanced Event Patterns¶
Event Pipelines¶
Combine events with data pipelines for complex workflows:
from mtaio.events import EventEmitter
from mtaio.data import Pipeline, Stage
class ValidationStage(Stage[dict, dict]):
async def process(self, data: dict) -> dict:
if "user_id" not in data:
raise ValueError("Missing user_id")
return data
class EnrichmentStage(Stage[dict, dict]):
async def process(self, data: dict) -> dict:
data["timestamp"] = time.time()
return data
async def setup_event_pipeline():
pipeline = Pipeline()
emitter = EventEmitter()
# Configure pipeline
pipeline.add_stage(ValidationStage())
pipeline.add_stage(EnrichmentStage())
@emitter.on("user_action")
async def handle_user_action(event):
async with pipeline:
processed_data = await pipeline.process(event.data)
await emitter.emit("processed_action", processed_data)
return emitter
Event Filtering and Transformation¶
from mtaio.events import EventEmitter, Event
async def setup_event_processing():
emitter = EventEmitter()
# Filter events
filtered = emitter.filter(
lambda event: event.data.get("priority") == "high"
)
# Transform events
transformed = emitter.map(
lambda event: Event(
event.name,
{**event.data, "processed": True}
)
)
# Chain multiple emitters
emitter.pipe(filtered)
filtered.pipe(transformed)
Advanced Caching Strategies¶
Cache Layering¶
Implement multi-level caching for optimal performance:
from mtaio.cache import TTLCache, DistributedCache
from typing import Optional, TypeVar, Generic
T = TypeVar("T")
class LayeredCache(Generic[T]):
def __init__(self):
self.local = TTLCache[T](default_ttl=60.0) # 1 minute local cache
self.distributed = DistributedCache[T]([
("localhost", 5000),
("localhost", 5001)
])
async def get(self, key: str) -> Optional[T]:
# Try local cache first
value = await self.local.get(key)
if value is not None:
return value
# Try distributed cache
value = await self.distributed.get(key)
if value is not None:
# Update local cache
await self.local.set(key, value)
return value
return None
async def set(self, key: str, value: T) -> None:
# Update both caches
await self.local.set(key, value)
await self.distributed.set(key, value)
Cache Invalidation Patterns¶
from mtaio.cache import TTLCache
from mtaio.events import EventEmitter
class CacheInvalidator:
def __init__(self):
self.cache = TTLCache[str]()
self.emitter = EventEmitter()
@self.emitter.on("data_updated")
async def invalidate_cache(event):
keys = event.data.get("affected_keys", [])
for key in keys:
await self.cache.delete(key)
if event.data.get("clear_all", False):
await self.cache.clear()
async def update_data(self, key: str, value: str) -> None:
await self.cache.set(key, value)
await self.emitter.emit("data_updated", {
"affected_keys": [key]
})
Advanced Resource Management¶
Custom Resource Limiters¶
Create specialized resource limiters for specific needs:
from mtaio.resources import ResourceLimiter
from mtaio.typing import AsyncFunc
from typing import Dict
class AdaptiveRateLimiter(ResourceLimiter):
def __init__(self):
self.rates: Dict[str, float] = {}
self._current_load = 0.0
async def acquire(self, resource_id: str) -> None:
rate = self.rates.get(resource_id, 1.0)
if self._current_load > 0.8: # 80% load
rate *= 0.5 # Reduce rate
await super().acquire(tokens=1/rate)
def adjust_rate(self, resource_id: str, load: float) -> None:
self._current_load = load
if load > 0.9: # High load
self.rates[resource_id] *= 0.8
elif load < 0.5: # Low load
self.rates[resource_id] *= 1.2
Complex Timeout Patterns¶
from mtaio.resources import TimeoutManager
from contextlib import asynccontextmanager
class TimeoutController:
def __init__(self):
self.timeouts = TimeoutManager()
@asynccontextmanager
async def cascading_timeout(self, timeouts: list[float]):
"""Implements cascading timeouts with fallback behavior"""
for timeout in timeouts:
try:
async with self.timeouts.timeout(timeout):
yield
break
except TimeoutError:
if timeout == timeouts[-1]:
raise
continue
Advanced Data Processing¶
Custom Pipeline Stages¶
Create specialized pipeline stages for complex data transformations:
from mtaio.data import Pipeline, Stage
from typing import Any, AsyncIterator
class BatchProcessingStage(Stage[Any, Any]):
def __init__(self, batch_size: int):
self.batch_size = batch_size
self.batch = []
async def process(self, item: Any) -> AsyncIterator[Any]:
self.batch.append(item)
if len(self.batch) >= self.batch_size:
result = await self._process_batch(self.batch)
self.batch = []
return result
async def _process_batch(self, batch: list[Any]) -> Any:
# Implement batch processing logic
return batch
Stream Processing¶
Implement complex stream processing patterns:
from mtaio.data import Stream
from typing import TypeVar, AsyncIterator
T = TypeVar("T")
class StreamProcessor(Stream[T]):
async def window(
self,
size: int,
slide: int = 1
) -> AsyncIterator[list[T]]:
"""Sliding window implementation"""
buffer: list[T] = []
async for item in self:
buffer.append(item)
if len(buffer) >= size:
yield buffer[-size:]
buffer = buffer[slide:]
async def batch_by_time(
self,
seconds: float
) -> AsyncIterator[list[T]]:
"""Time-based batching"""
batch: list[T] = []
start_time = time.monotonic()
async for item in self:
batch.append(item)
if time.monotonic() - start_time >= seconds:
yield batch
batch = []
start_time = time.monotonic()
Advanced Monitoring¶
Custom Metrics Collection¶
from mtaio.monitoring import ResourceMonitor
from dataclasses import dataclass
@dataclass
class CustomMetrics:
request_count: int = 0
error_count: int = 0
average_response_time: float = 0.0
class ApplicationMonitor(ResourceMonitor):
def __init__(self):
super().__init__()
self.metrics = CustomMetrics()
async def collect_metrics(self) -> None:
while True:
stats = await self.get_current_stats()
# Update custom metrics
self.metrics.average_response_time = (
stats.latency_sum / stats.request_count
if stats.request_count > 0 else 0.0
)
# Emit alerts if needed
if self.metrics.error_count > 100:
await self.alert("High error rate detected")
await asyncio.sleep(60) # Collect every minute
Production Best Practices¶
Error Recovery¶
from mtaio.core import TaskExecutor
from mtaio.exceptions import MTAIOError
class ResilientExecutor:
def __init__(self):
self.executor = TaskExecutor()
self.retry_count = 3
async def execute_with_recovery(
self,
func: AsyncFunc[T],
*args: Any,
**kwargs: Any
) -> T:
for attempt in range(self.retry_count):
try:
return await self.executor.run(
func(*args, **kwargs)
)
except MTAIOError as e:
if attempt == self.retry_count - 1:
raise
await asyncio.sleep(2 ** attempt) # Exponential backoff
Resource Cleanup¶
from mtaio.resources import ResourceManager
from typing import AsyncIterator
class ManagedResources:
def __init__(self):
self.resources: list[AsyncCloseable] = []
async def acquire(self, resource: AsyncCloseable) -> None:
self.resources.append(resource)
async def cleanup(self) -> None:
while self.resources:
resource = self.resources.pop()
await resource.close()
@asynccontextmanager
async def resource_scope(self) -> AsyncIterator[None]:
try:
yield
finally:
await self.cleanup()
Next Steps¶
- Check out our Example Applications
- Review the API Reference
- Join our Community Discussions