[Phase 4] Implement event bus (Redis) #18

Closed
opened 2025-12-21 13:04:06 +00:00 by Damien · 2 comments
Owner

Description

Implement an event bus using Redis to decouple event producers from consumers and enable async processing.

Tasks

  • Set up Redis connection management
  • Define event types and schemas
  • Implement event publishing (pub)
  • Implement event subscription (sub)
  • Add event persistence for audit trail
  • Implement retry logic for failed handlers

Event Types

class EventType(Enum):
    # Intent events
    INTENT_CHANGED = "intent.changed"
    INTENT_VALIDATED = "intent.validated"
    
    # Reconciliation events
    PLAN_GENERATED = "reconciler.plan_generated"
    APPLY_STARTED = "reconciler.apply_started"
    APPLY_COMPLETED = "reconciler.apply_completed"
    APPLY_FAILED = "reconciler.apply_failed"
    
    # Drift events
    DRIFT_DETECTED = "drift.detected"
    DRIFT_REMEDIATED = "drift.remediated"
    
    # Device events
    DEVICE_CONNECTED = "device.connected"
    DEVICE_DISCONNECTED = "device.disconnected"

Event Schema

@dataclass
class Event:
    id: str
    type: EventType
    timestamp: datetime
    source: str
    data: dict
    correlation_id: Optional[str] = None

Usage Example

# Publishing
await event_bus.publish(Event(
    type=EventType.DRIFT_DETECTED,
    source="drift_monitor",
    data={"device": "leaf1", "path": "/vlans/vlan[id=40]"}
))

# Subscribing
@event_bus.subscribe(EventType.DRIFT_DETECTED)
async def handle_drift(event: Event):
    logger.warning(f"Drift detected on {event.data['device']}")

Output

  • src/events/bus.py
  • src/events/types.py
## Description Implement an event bus using Redis to decouple event producers from consumers and enable async processing. ## Tasks - [ ] Set up Redis connection management - [ ] Define event types and schemas - [ ] Implement event publishing (pub) - [ ] Implement event subscription (sub) - [ ] Add event persistence for audit trail - [ ] Implement retry logic for failed handlers ## Event Types ```python class EventType(Enum): # Intent events INTENT_CHANGED = "intent.changed" INTENT_VALIDATED = "intent.validated" # Reconciliation events PLAN_GENERATED = "reconciler.plan_generated" APPLY_STARTED = "reconciler.apply_started" APPLY_COMPLETED = "reconciler.apply_completed" APPLY_FAILED = "reconciler.apply_failed" # Drift events DRIFT_DETECTED = "drift.detected" DRIFT_REMEDIATED = "drift.remediated" # Device events DEVICE_CONNECTED = "device.connected" DEVICE_DISCONNECTED = "device.disconnected" ``` ## Event Schema ```python @dataclass class Event: id: str type: EventType timestamp: datetime source: str data: dict correlation_id: Optional[str] = None ``` ## Usage Example ```python # Publishing await event_bus.publish(Event( type=EventType.DRIFT_DETECTED, source="drift_monitor", data={"device": "leaf1", "path": "/vlans/vlan[id=40]"} )) # Subscribing @event_bus.subscribe(EventType.DRIFT_DETECTED) async def handle_drift(event: Event): logger.warning(f"Drift detected on {event.data['device']}") ``` ## Output - `src/events/bus.py` - `src/events/types.py`
Damien added the phase-4-event-driven label 2025-12-21 13:04:10 +00:00
Author
Owner

🔄 Migration vers Kestra

Cette issue est désormais obsolète suite à la décision d'utiliser Kestra comme plateforme d'orchestration.

Remplacement

Kestra dispose de son propre système d'événements natif :

Flow Triggers - Déclencher un workflow quand un autre termine :

triggers:
  - id: on_reconcile_complete
    type: io.kestra.plugin.core.trigger.Flow
    conditions:
      - type: io.kestra.plugin.core.condition.ExecutionStatus
        in: [SUCCESS, FAILED]

Outputs entre tâches - Communication inter-tâches :

from kestra import Kestra
Kestra.outputs({"drift_detected": True, "device": "leaf1"})

Avantages

  • Pas d'infrastructure Redis à maintenir
  • Persistance native des événements
  • UI pour visualiser le flux d'événements
  • Corrélation automatique des exécutions

Voir les nouvelles issues Kestra pour l'implémentation.

## 🔄 Migration vers Kestra Cette issue est désormais **obsolète** suite à la décision d'utiliser [Kestra](https://kestra.io) comme plateforme d'orchestration. ### Remplacement Kestra dispose de son propre système d'événements natif : **Flow Triggers** - Déclencher un workflow quand un autre termine : ```yaml triggers: - id: on_reconcile_complete type: io.kestra.plugin.core.trigger.Flow conditions: - type: io.kestra.plugin.core.condition.ExecutionStatus in: [SUCCESS, FAILED] ``` **Outputs entre tâches** - Communication inter-tâches : ```python from kestra import Kestra Kestra.outputs({"drift_detected": True, "device": "leaf1"}) ``` ### Avantages - Pas d'infrastructure Redis à maintenir - Persistance native des événements - UI pour visualiser le flux d'événements - Corrélation automatique des exécutions Voir les nouvelles issues Kestra pour l'implémentation.
Author
Owner

🔄 Clôture - Migration vers Kestra

Cette issue est obsolète suite à la migration vers Kestra comme moteur d'orchestration.

Raison

Kestra possède son propre système d'événements natif :

  • Flow Triggers (io.kestra.plugin.core.trigger.Flow) pour le chaînage de workflows
  • Events internes pour la communication inter-tâches via outputs
  • Notifications intégrées (Slack, Discord, Teams, etc.)

L'implémentation d'un event bus Redis custom n'est plus nécessaire.

Remplacement Kestra

# Trigger un workflow quand un autre se termine
triggers:
  - id: on_drift_detected
    type: io.kestra.plugin.core.trigger.Flow
    conditions:
      - type: io.kestra.plugin.core.condition.ExecutionStatus
        in: [SUCCESS]
      - type: io.kestra.plugin.core.condition.ExecutionNamespace
        namespace: network.fabric
## 🔄 Clôture - Migration vers Kestra Cette issue est **obsolète** suite à la migration vers [Kestra](https://kestra.io) comme moteur d'orchestration. ### Raison Kestra possède son propre système d'événements natif : - **Flow Triggers** (`io.kestra.plugin.core.trigger.Flow`) pour le chaînage de workflows - **Events internes** pour la communication inter-tâches via `outputs` - **Notifications** intégrées (Slack, Discord, Teams, etc.) L'implémentation d'un event bus Redis custom n'est plus nécessaire. ### Remplacement Kestra ```yaml # Trigger un workflow quand un autre se termine triggers: - id: on_drift_detected type: io.kestra.plugin.core.trigger.Flow conditions: - type: io.kestra.plugin.core.condition.ExecutionStatus in: [SUCCESS] - type: io.kestra.plugin.core.condition.ExecutionNamespace namespace: network.fabric ```
Sign in to join this conversation.