[Phase 4] Implement drift detection service with Kestra webhook #16

Open
opened 2025-12-20 15:46:04 +00:00 by Damien · 0 comments
Owner

Description

⚠️ Mise à jour : Avec Kestra, la détection de drift utilise un service externe (daemon Python ou container) qui fait le gNMI Subscribe et trigger un webhook Kestra quand un drift est détecté.

Implémenter un service de monitoring qui utilise gNMI Subscribe pour détecter les changements de configuration en temps réel.

Architecture

┌─────────────────────────────────────────────────────────────────┐
│                    Drift Detection Service                       │
│  ┌─────────────────────���───────────────────────────────────────┐│
│  │  gNMI Subscribe (ON_CHANGE)                                 ││
│  │  - /network-instances/network-instance/vlans                ││
│  │  - /interfaces/interface                                    ││
│  │  - /arista/eos/arista-exp-eos-vxlan                        ││
│  └──────────────────────────┬──────────────────────────────────┘│
│                             │ Drift detected                    │
│                             ▼                                   │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │  Compare with cached desired state                          ││
│  │  If different → POST to Kestra webhook                      ││
│  └─────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼ HTTP POST
┌─────────────────────────────────────────────────────────────────┐
│                    Kestra Workflow                               │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │  Trigger: Webhook                                           ││
│  │  → Log drift event                                          ││
│  │  → Notify (Slack/Discord)                                   ││
│  │  → Optionally trigger remediation                           ││
│  └─────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────┘

Service de drift detection

# scripts/drift_monitor.py
import asyncio
import httpx
from src.gnmi import GNMIClient

KESTRA_WEBHOOK_URL = "http://kestra:8080/api/v1/executions/webhook/network.fabric/drift-detected/{key}"

SUBSCRIBE_PATHS = [
    "/openconfig-network-instance:network-instances",
    "/openconfig-interfaces:interfaces",
    "/arista/eos/arista-exp-eos-vxlan:arista-exp-eos-vxlan",
]

async def monitor_device(device: str, desired_state: dict):
    async with GNMIClient(target=f"{device}:6030") as client:
        async for update in client.subscribe(SUBSCRIBE_PATHS, mode="on_change"):
            if is_drift(update, desired_state):
                await notify_drift(device, update)

async def notify_drift(device: str, update: dict):
    async with httpx.AsyncClient() as http:
        await http.post(KESTRA_WEBHOOK_URL, json={
            "device": device,
            "path": update["path"],
            "current_value": update["value"],
            "timestamp": update["timestamp"]
        })

Workflow Kestra: drift-detected.yml

id: drift-detected
namespace: network.fabric
description: Handle drift detection events from gNMI Subscribe service

tasks:
  - id: log_drift
    type: io.kestra.plugin.core.log.Log
    message: |
      ⚠️ Drift detected on {{ trigger.body.device }}
      Path: {{ trigger.body.path }}
      Time: {{ trigger.body.timestamp }}

  - id: notify_slack
    type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
    url: "{{ secret('SLACK_WEBHOOK') }}"
    payload: |
      {
        "text": "⚠️ *Configuration Drift Detected*",
        "attachments": [{
          "color": "warning",
          "fields": [
            {"title": "Device", "value": "{{ trigger.body.device }}", "short": true},
            {"title": "Path", "value": "{{ trigger.body.path }}", "short": false}
          ]
        }]
      }

  - id: decide_remediation
    type: io.kestra.plugin.core.flow.If
    condition: "{{ secret('AUTO_REMEDIATION_ENABLED') == 'true' }}"
    then:
      - id: trigger_remediation
        type: io.kestra.plugin.core.flow.Subflow
        namespace: network.fabric
        flowId: fabric-reconcile
        inputs:
          device: "{{ trigger.body.device }}"
          auto_apply: true

triggers:
  - id: drift_webhook
    type: io.kestra.plugin.core.trigger.Webhook
    key: "{{ secret('DRIFT_WEBHOOK_KEY') }}"

Docker Compose pour le service

services:
  drift-monitor:
    build: .
    command: python scripts/drift_monitor.py
    environment:
      - KESTRA_WEBHOOK_URL=http://kestra:8080/api/v1/executions/webhook/...
      - GNMI_USERNAME=admin
      - GNMI_PASSWORD=admin
    depends_on:
      - kestra
    restart: unless-stopped

Tasks

  • Implémenter scripts/drift_monitor.py avec gNMI Subscribe
  • Ajouter le cache de l'état désiré (depuis NetBox)
  • Implémenter la comparaison drift vs desired state
  • Créer le workflow Kestra drift-detected.yml
  • Ajouter le service au docker-compose.yml
  • Tests avec changement manuel sur un device
  • Documentation du setup

Subscribe Paths

SUBSCRIBE_PATHS = [
    # VLANs
    "/openconfig-network-instance:network-instances/network-instance[name=default]/vlans",
    # Interfaces
    "/openconfig-interfaces:interfaces/interface/config",
    # VXLAN
    "/arista/eos/arista-exp-eos-vxlan:arista-exp-eos-vxlan/config",
    # BGP
    "/network-instances/network-instance[name=default]/protocols/protocol[identifier=BGP]/bgp",
]

Output

  • scripts/drift_monitor.py
  • kestra/flows/drift-detected.yml
  • docker-compose.yml (service ajouté)
## Description > ⚠️ **Mise à jour** : Avec Kestra, la détection de drift utilise un **service externe** (daemon Python ou container) qui fait le gNMI Subscribe et **trigger un webhook Kestra** quand un drift est détecté. Implémenter un service de monitoring qui utilise gNMI Subscribe pour détecter les changements de configuration en temps réel. ## Architecture ``` ┌─────────────────────────────────────────────────────────────────┐ │ Drift Detection Service │ │ ┌─────────────────────���───────────────────────────────────────┐│ │ │ gNMI Subscribe (ON_CHANGE) ││ │ │ - /network-instances/network-instance/vlans ││ │ │ - /interfaces/interface ││ │ │ - /arista/eos/arista-exp-eos-vxlan ││ │ └──────────────────────────┬──────────────────────────────────┘│ │ │ Drift detected │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────┐│ │ │ Compare with cached desired state ││ │ │ If different → POST to Kestra webhook ││ │ └─────────────────────────────────────────────────────────────┘│ └─────────────────────────────────────────────────────────────────┘ │ ▼ HTTP POST ┌─────────────────────────────────────────────────────────────────┐ │ Kestra Workflow │ │ ┌─────────────────────────────────────────────────────────────┐│ │ │ Trigger: Webhook ││ │ │ → Log drift event ││ │ │ → Notify (Slack/Discord) ││ │ │ → Optionally trigger remediation ││ │ └─────────────────────────────────────────────────────────────┘│ └─────────────────────────────────────────────────────────────────┘ ``` ## Service de drift detection ```python # scripts/drift_monitor.py import asyncio import httpx from src.gnmi import GNMIClient KESTRA_WEBHOOK_URL = "http://kestra:8080/api/v1/executions/webhook/network.fabric/drift-detected/{key}" SUBSCRIBE_PATHS = [ "/openconfig-network-instance:network-instances", "/openconfig-interfaces:interfaces", "/arista/eos/arista-exp-eos-vxlan:arista-exp-eos-vxlan", ] async def monitor_device(device: str, desired_state: dict): async with GNMIClient(target=f"{device}:6030") as client: async for update in client.subscribe(SUBSCRIBE_PATHS, mode="on_change"): if is_drift(update, desired_state): await notify_drift(device, update) async def notify_drift(device: str, update: dict): async with httpx.AsyncClient() as http: await http.post(KESTRA_WEBHOOK_URL, json={ "device": device, "path": update["path"], "current_value": update["value"], "timestamp": update["timestamp"] }) ``` ## Workflow Kestra: `drift-detected.yml` ```yaml id: drift-detected namespace: network.fabric description: Handle drift detection events from gNMI Subscribe service tasks: - id: log_drift type: io.kestra.plugin.core.log.Log message: | ⚠️ Drift detected on {{ trigger.body.device }} Path: {{ trigger.body.path }} Time: {{ trigger.body.timestamp }} - id: notify_slack type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook url: "{{ secret('SLACK_WEBHOOK') }}" payload: | { "text": "⚠️ *Configuration Drift Detected*", "attachments": [{ "color": "warning", "fields": [ {"title": "Device", "value": "{{ trigger.body.device }}", "short": true}, {"title": "Path", "value": "{{ trigger.body.path }}", "short": false} ] }] } - id: decide_remediation type: io.kestra.plugin.core.flow.If condition: "{{ secret('AUTO_REMEDIATION_ENABLED') == 'true' }}" then: - id: trigger_remediation type: io.kestra.plugin.core.flow.Subflow namespace: network.fabric flowId: fabric-reconcile inputs: device: "{{ trigger.body.device }}" auto_apply: true triggers: - id: drift_webhook type: io.kestra.plugin.core.trigger.Webhook key: "{{ secret('DRIFT_WEBHOOK_KEY') }}" ``` ## Docker Compose pour le service ```yaml services: drift-monitor: build: . command: python scripts/drift_monitor.py environment: - KESTRA_WEBHOOK_URL=http://kestra:8080/api/v1/executions/webhook/... - GNMI_USERNAME=admin - GNMI_PASSWORD=admin depends_on: - kestra restart: unless-stopped ``` ## Tasks - [ ] Implémenter `scripts/drift_monitor.py` avec gNMI Subscribe - [ ] Ajouter le cache de l'état désiré (depuis NetBox) - [ ] Implémenter la comparaison drift vs desired state - [ ] Créer le workflow Kestra `drift-detected.yml` - [ ] Ajouter le service au `docker-compose.yml` - [ ] Tests avec changement manuel sur un device - [ ] Documentation du setup ## Subscribe Paths ```python SUBSCRIBE_PATHS = [ # VLANs "/openconfig-network-instance:network-instances/network-instance[name=default]/vlans", # Interfaces "/openconfig-interfaces:interfaces/interface/config", # VXLAN "/arista/eos/arista-exp-eos-vxlan:arista-exp-eos-vxlan/config", # BGP "/network-instances/network-instance[name=default]/protocols/protocol[identifier=BGP]/bgp", ] ``` ## Output - `scripts/drift_monitor.py` - `kestra/flows/drift-detected.yml` - `docker-compose.yml` (service ajouté)
Damien added the phase-4-event-driven label 2025-12-20 15:46:49 +00:00
Damien changed title from [Phase 4] Implement gNMI Subscribe for drift detection to [Phase 4] Implement drift detection service with Kestra webhook 2026-01-10 13:10:04 +00:00
Sign in to join this conversation.