Events API Reference¶
The mtaio.events
module provides a robust event handling system for building event-driven applications.
EventEmitter¶
Basic Usage¶
from mtaio.events import EventEmitter
# Create emitter
emitter = EventEmitter()
# Define handler
@emitter.on("user_login")
async def handle_login(event):
user = event.data
print(f"User {user['name']} logged in")
# Emit event
await emitter.emit("user_login", {
"name": "John",
"id": "123"
})
Class Reference¶
class EventEmitter:
def __init__(self):
"""Initialize event emitter."""
def on(
self,
event_name: str,
priority: EventPriority = EventPriority.NORMAL
) -> Callable:
"""
Decorator to register an event handler.
Args:
event_name: Name of the event to handle
priority: Handler priority level
Returns:
Decorated event handler
"""
def once(
self,
event_name: str,
priority: EventPriority = EventPriority.NORMAL
) -> Callable:
"""Register one-time event handler."""
async def emit(
self,
event_name: str,
data: Any,
metadata: Optional[Dict[str, Any]] = None
) -> None:
"""
Emit an event.
Args:
event_name: Name of the event
data: Event data
metadata: Optional event metadata
"""
Event Types¶
Event¶
Base event container class.
from mtaio.events import Event, ChangeType
@dataclass
class Event[T]:
name: str # Event name
data: T # Event data
propagate: bool = True # Whether to continue propagation
metadata: Dict[str, Any] = field(default_factory=dict)
EventPriority¶
Priority levels for event handlers.
class EventPriority(Enum):
LOWEST = auto() # Lowest priority
LOW = auto() # Low priority
NORMAL = auto() # Normal priority
HIGH = auto() # High priority
HIGHEST = auto() # Highest priority
MONITOR = auto() # Monitoring only
Advanced Features¶
Event Filtering¶
Filter events based on conditions:
from mtaio.events import EventEmitter
emitter = EventEmitter()
# Create filtered emitter
filtered = emitter.filter(
lambda event: event.data.get("priority") == "high"
)
@filtered.on("alert")
async def handle_high_priority_alert(event):
alert = event.data
print(f"High priority alert: {alert['message']}")
Event Transformation¶
Transform events before handling:
from mtaio.events import EventEmitter, Event
# Create transformed emitter
transformed = emitter.map(
lambda event: Event(
name=event.name,
data={**event.data, "processed": True}
)
)
@transformed.on("data_event")
async def handle_processed_data(event):
# Handle transformed event
pass
Batch Operations¶
Group multiple events:
from mtaio.events import EventEmitter
emitter = EventEmitter()
async def batch_operations():
async with emitter.batch_operations():
# Events are batched
await emitter.emit("event1", data1)
await emitter.emit("event2", data2)
# Handlers are called once with all events
Event Channels¶
Create event channels for specific purposes:
from mtaio.events import Channel, Subscriber
async def channel_example():
channel = Channel[str]("notifications")
# Subscribe to channel
subscriber = await channel.subscribe()
# Publish message
await channel.publish("Hello subscribers!")
# Receive message
message = await subscriber.receive()
Topic-based Events¶
Handle events by topic:
from mtaio.events import Channel
async def topic_example():
channel = Channel[str]("events")
# Subscribe to specific topics
subscriber = await channel.subscribe(["user.*", "system.*"])
# Publish to topic
await channel.publish(
"User logged in",
topic="user.login"
)
Error Handling¶
Event Errors¶
Handle event-related errors:
from mtaio.exceptions import (
EventError,
EventEmitError,
EventHandlerError
)
async def handle_errors():
try:
await emitter.emit("event", data)
except EventEmitError:
# Handle emission error
pass
except EventHandlerError:
# Handle handler error
pass
Error Events¶
Emit error events:
@emitter.on("error")
async def handle_error(event):
error = event.data
print(f"Error occurred: {error}")
# Emit error event
try:
await process_data()
except Exception as e:
await emitter.emit("error", e)
Best Practices¶
Event Handler Organization¶
class UserEventHandlers:
def __init__(self, emitter: EventEmitter):
self.emitter = emitter
self.setup_handlers()
def setup_handlers(self):
@self.emitter.on("user.created")
async def handle_user_created(event):
pass
@self.emitter.on("user.updated")
async def handle_user_updated(event):
pass
# Usage
handlers = UserEventHandlers(emitter)
Resource Management¶
class EventManager:
def __init__(self):
self.emitter = EventEmitter()
self._handlers = []
def register_handler(self, event: str, handler: Callable):
self._handlers.append((event, handler))
self.emitter.on(event)(handler)
async def cleanup(self):
for event, handler in self._handlers:
self.emitter.remove_listener(event, handler)
Performance Optimization¶
-
Event Batching
-
Priority Handling
-
Event Filtering
Examples¶
Event-driven Data Processing¶
from mtaio.events import EventEmitter
class DataProcessor:
def __init__(self):
self.emitter = EventEmitter()
async def process(self, data: dict):
# Emit preprocessing event
await self.emitter.emit("process.start", data)
try:
result = await self.process_data(data)
await self.emitter.emit("process.complete", result)
except Exception as e:
await self.emitter.emit("process.error", e)
Event Monitoring¶
class EventMonitor:
def __init__(self, emitter: EventEmitter):
@emitter.on("*", priority=EventPriority.MONITOR)
async def monitor_all(event):
await self.log_event(event)
async def log_event(self, event: Event):
print(f"Event: {event.name}, Data: {event.data}")
See Also¶
- Core API Reference for base functionality
- Decorators API Reference for decorators
- Examples Repository