Event-Driven Architecture
Ein Architekturmuster, bei dem Komponenten über Ereignisse (Events) kommunizieren statt direkt miteinander – für lose Kopplung, Skalierbarkeit und Reaktionsfähigkeit.
Ein Architektur-Pattern, bei dem der Zustand einer Anwendung nicht direkt gespeichert wird, sondern aus einer Sequenz von Events rekonstruiert wird.
Bei Event Sourcing speicherst du nicht den aktuellen Zustand, sondern alle Events, die zu diesem Zustand geführt haben. Der aktuelle Zustand wird durch Abspielen aller Events rekonstruiert.
Traditionell (State-basiert):
Datenbank: { balance: 150 }
→ Du weißt: Kontostand ist 150€
→ Du weißt NICHT: Wie kam es dazu?
Event Sourcing:
Event Store:
1. AccountCreated { balance: 0 }
2. MoneyDeposited { amount: 200 }
3. MoneyWithdrawn { amount: 50 }
→ Replay: 0 + 200 - 50 = 150€
→ Du weißt ALLES: Wann, was, warum
Vorteile:
| Aspekt | Traditionell | Event Sourcing |
|---|---|---|
| Audit | Manuell implementieren | Eingebaut |
| Debugging | ”Wie kam es dazu?” | Replay und nachvollziehen |
| Undo | Schwierig | Event rückgängig machen |
| Zeitreisen | Unmöglich | Zustand zu jedem Zeitpunkt |
| Analytics | Separates System | Events direkt auswerten |
┌─────────────────────────────────────────────────────┐
│ Event Store │
├──────────┬──────────┬───────────────────────────────┤
│ Event ID │ Stream │ Event Data │
├──────────┼──────────┼───────────────────────────────┤
│ 1 │ order-1 │ OrderCreated { items: [...] } │
│ 2 │ order-1 │ ItemAdded { sku: "ABC" } │
│ 3 │ order-1 │ ItemRemoved { sku: "XYZ" } │
│ 4 │ order-1 │ OrderSubmitted { total: 99 } │
│ 5 │ order-2 │ OrderCreated { items: [...] } │
└──────────┴──────────┴───────────────────────────────┘
Streams: Gruppieren Events zu einer Entität (z.B. eine Bestellung).
from dataclasses import dataclass
from datetime import datetime
from typing import List
@dataclass
class Event:
event_id: str
timestamp: datetime
@dataclass
class OrderCreated(Event):
order_id: str
customer_id: str
items: List[dict]
@dataclass
class ItemAdded(Event):
order_id: str
sku: str
quantity: int
price: float
@dataclass
class OrderSubmitted(Event):
order_id: str
total: float
class Order:
def __init__(self, order_id: str):
self.order_id = order_id
self.items = []
self.status = "draft"
self.total = 0
def apply(self, event: Event):
"""Event auf Zustand anwenden"""
if isinstance(event, OrderCreated):
self.items = event.items
elif isinstance(event, ItemAdded):
self.items.append({
"sku": event.sku,
"quantity": event.quantity,
"price": event.price
})
elif isinstance(event, OrderSubmitted):
self.status = "submitted"
self.total = event.total
@classmethod
def from_events(cls, order_id: str, events: List[Event]):
"""Zustand aus Events rekonstruieren"""
order = cls(order_id)
for event in events:
order.apply(event)
return order
class OrderService:
def __init__(self, event_store):
self.event_store = event_store
def add_item(self, order_id: str, sku: str, quantity: int, price: float):
# 1. Aktuelle Events laden
events = self.event_store.get_events(f"order-{order_id}")
# 2. Zustand rekonstruieren
order = Order.from_events(order_id, events)
# 3. Business Logic prüfen
if order.status != "draft":
raise Exception("Cannot modify submitted order")
# 4. Neues Event erstellen und speichern
event = ItemAdded(
event_id=uuid4(),
timestamp=datetime.now(),
order_id=order_id,
sku=sku,
quantity=quantity,
price=price
)
self.event_store.append(f"order-{order_id}", event)
Bei vielen Events wird Replay langsam. Lösung: Snapshots.
class SnapshotStore:
def save_snapshot(self, stream_id: str, state: dict, version: int):
# Aktuellen Zustand speichern
...
def get_snapshot(self, stream_id: str):
# Letzten Snapshot laden
...
def load_order(order_id: str):
# 1. Snapshot laden (falls vorhanden)
snapshot = snapshot_store.get_snapshot(f"order-{order_id}")
if snapshot:
order = Order.from_snapshot(snapshot.state)
# Nur Events seit Snapshot laden
events = event_store.get_events_since(
f"order-{order_id}",
snapshot.version
)
else:
order = Order(order_id)
events = event_store.get_events(f"order-{order_id}")
# 2. Restliche Events anwenden
for event in events:
order.apply(event)
return order
Oft kombiniert:
Commands → Event Store → Events
↓
Projections
↓
Read Models (optimiert für Queries)
Projections: Transformieren Events in lesbare Views.
class OrderListProjection:
def __init__(self, read_db):
self.read_db = read_db
def handle(self, event: Event):
if isinstance(event, OrderCreated):
self.read_db.insert("orders", {
"id": event.order_id,
"customer": event.customer_id,
"status": "draft"
})
elif isinstance(event, OrderSubmitted):
self.read_db.update("orders",
{"id": event.order_id},
{"status": "submitted", "total": event.total}
) Event Sourcing ist wie ein Kontoauszug: Statt nur den aktuellen Kontostand zu speichern, hast du alle Transaktionen. Du kannst jederzeit nachvollziehen, wie der Stand zustande kam – und den Zustand zu jedem Zeitpunkt rekonstruieren.
Speichert Events (was passiert ist) statt aktuellen Zustand
Vollständige Audit-Historie und Nachvollziehbarkeit
Zustand wird durch Replay aller Events rekonstruiert
Finanzanwendungen
Vollständige Transaktionshistorie für Audit und Compliance
Marketing
Bestellhistorie, Warenkorbänderungen nachvollziehen
Collaboration Tools
Dokumenten-Historie, Undo/Redo-Funktionalität
Gaming
Spielstände rekonstruieren, Replays ermöglichen
Wenn Audit-Trail wichtig ist, wenn du Zustand zu jedem Zeitpunkt rekonstruieren musst, wenn du komplexe Domain-Logik hast. Nicht für einfache CRUD-Anwendungen.
Ja, deshalb gibt es Snapshots: Periodisch den aktuellen Zustand speichern und nur Events seit dem letzten Snapshot replaying. Alte Events können archiviert werden.
Event-Driven: Events zur Kommunikation zwischen Services. Event Sourcing: Events als primäre Datenquelle. Oft kombiniert, aber unterschiedliche Konzepte.
Event Versioning: Alte Events bleiben, neue haben neues Schema. Upcasting: Alte Events beim Lesen in neues Format transformieren. Erfordert sorgfältige Planung.