[Phase 4] Implement Drift Detection and Auto-Remediation #36

Open
opened 2026-01-30 13:22:56 +00:00 by Damien · 0 comments
Owner

Description

Implement drift detection using gNMI Subscribe to monitor device configuration in real-time and trigger remediation when unauthorized changes occur.

Context

Configuration drift happens when device configuration diverges from the intended state (e.g., manual CLI changes, device bugs). Using gNMI ON_CHANGE subscriptions, we can detect drift in real-time and optionally auto-remediate by re-applying intent from InfraHub.

Tasks

  • Create DriftMonitor service using gNMI Subscribe
  • Implement ON_CHANGE subscriptions for key paths
  • Create @flow handle_drift to process drift events
  • Create @flow drift_remediation for auto-fix
  • Add configurable remediation policies (alert-only, auto-fix)
  • Implement drift event logging
  • Add CLI command fabric-orch drift status
  • Support pause_flow_run() for human approval

Architecture

┌─────────────────────┐
│    Fabric Device    │
│    (gNMI Server)    │
└──────────┬──────────┘
           │ gNMI Subscribe (ON_CHANGE)
           ▼
┌─────────────────────┐
│   Drift Monitor     │
│   (Long-running)    │
└──────────┬──────────┘
           │ Drift Event
           ▼
┌─────────────────────┐
│  handle_drift Flow  │
│  - Log event        │
│  - Check policy     │
│  - Trigger remediate│
└──────────┬──────────┘
           │ If auto_remediate
           ▼
┌─────────────────────┐
│ drift_remediation   │
│  - Get intent from  │
│    InfraHub         │
│  - Compute fix      │
│  - Apply via gNMI   │
└─────────────────────┘

Drift Monitor Service

import asyncio
from dataclasses import dataclass
from datetime import datetime
from src.gnmi import GnmiClient


@dataclass
class DriftEvent:
    device: str
    path: str
    expected_value: any
    actual_value: any
    timestamp: datetime
    

class DriftMonitor:
    """Monitor fabric devices for configuration drift using gNMI Subscribe."""
    
    # Paths to monitor for drift
    MONITORED_PATHS = [
        "/interfaces/interface/config",
        "/network-instances/network-instance[name=default]/vlans/vlan/config",
        "/network-instances/network-instance[name=default]/protocols/protocol[identifier=BGP]/bgp/neighbors/neighbor/config",
    ]
    
    def __init__(self, devices: list[str], intent_cache: dict):
        self.devices = devices
        self.intent_cache = intent_cache  # Expected state from InfraHub
        self.subscriptions = {}
        
    async def start(self):
        """Start monitoring all devices."""
        tasks = [self._monitor_device(device) for device in self.devices]
        await asyncio.gather(*tasks)
    
    async def _monitor_device(self, device: str):
        """Subscribe to config changes on a device."""
        async with GnmiClient(target=f"{device}:6030") as client:
            async for update in client.subscribe(
                paths=self.MONITORED_PATHS,
                mode="ON_CHANGE"
            ):
                await self._handle_update(device, update)
    
    async def _handle_update(self, device: str, update: dict):
        """Check if update represents drift from intent."""
        path = update["path"]
        new_value = update["value"]
        
        expected = self._get_expected_value(device, path)
        
        if expected is not None and new_value != expected:
            event = DriftEvent(
                device=device,
                path=path,
                expected_value=expected,
                actual_value=new_value,
                timestamp=datetime.utcnow()
            )
            await self._emit_drift_event(event)
    
    async def _emit_drift_event(self, event: DriftEvent):
        """Trigger drift handling flow."""
        from prefect.deployments import run_deployment
        
        run_deployment(
            name="handle-drift/handle-drift-deployment",
            parameters={
                "device": event.device,
                "path": event.path,
                "expected": event.expected_value,
                "actual": event.actual_value
            }
        )

Prefect Drift Flows

from prefect import flow, task
from prefect.input import RunInput


class DriftApproval(RunInput):
    """Input for human-in-the-loop drift approval."""
    approved: bool
    notes: str = ""


@flow(name="handle-drift", log_prints=True)
def handle_drift(
    device: str,
    path: str,
    expected: any,
    actual: any,
    auto_remediate: bool = False
):
    """
    Handle a drift event detected by the monitor.
    
    Policies:
    - alert_only: Log and notify, no action
    - manual: Pause for human approval
    - auto: Automatically remediate
    """
    print(f"⚠️ Drift detected on {device}")
    print(f"   Path: {path}")
    print(f"   Expected: {expected}")
    print(f"   Actual: {actual}")
    
    # Log drift event
    log_drift_event(device, path, expected, actual)
    
    if not auto_remediate:
        # Pause for human approval
        approval = pause_flow_run(
            wait_for_input=DriftApproval.with_initial_data(
                description=f"Drift detected on {device} at {path}. Remediate?",
            )
        )
        
        if not approval.approved:
            print(f"❌ Remediation declined: {approval.notes}")
            return {"remediated": False, "reason": "declined"}
    
    # Trigger remediation
    return drift_remediation(device=device, paths=[path])


@flow(name="drift-remediation", log_prints=True)
def drift_remediation(device: str, paths: list[str] = None):
    """
    Remediate drift by re-applying intent from InfraHub.
    
    If paths is specified, only remediate those specific paths.
    Otherwise, full device reconciliation.
    """
    from src.flows.reconcile import fabric_reconcile
    
    print(f"🔧 Remediating drift on {device}")
    
    result = fabric_reconcile(
        device=device,
        auto_apply=True,
        dry_run=False
    )
    
    if result["applied"]:
        print(f"✅ Drift remediated on {device}")
    
    return result


@task
def log_drift_event(device: str, path: str, expected: any, actual: any):
    """Log drift event for audit trail."""
    import structlog
    
    logger = structlog.get_logger()
    logger.warning(
        "drift_detected",
        device=device,
        path=path,
        expected_value=expected,
        actual_value=actual
    )

CLI Integration

# Check drift status across fabric
fabric-orch drift status

# Output:
# Drift Monitor Status
# ════════════════════════════════════════
# 
# Devices Monitored: 10
# Active Subscriptions: 10
# Intent Source: InfraHub (branch: main)
# 
# Recent Drift Events (last 24h):
#   leaf1: 2 events (1 remediated, 1 pending)
#   leaf3: 1 event (auto-remediated)
# 
# No active drift detected

# View drift history
fabric-orch drift history --device leaf1

# Manually trigger remediation
fabric-orch drift remediate --device leaf1

Output Files

  • src/services/__init__.py
  • src/services/drift_monitor.py
  • src/flows/drift.py
  • src/flows/remediation.py

gNMI Subscribe Notes

Based on Phase 1 findings, ON_CHANGE subscriptions work best with native YANG paths (not OpenConfig). Key paths to monitor:

# Working ON_CHANGE paths (Arista native)
"/interfaces/interface[name=*]/state/oper-status"
"/interfaces/interface[name=*]/config/description"
"/network-instances/network-instance/vlans/vlan/config"

# May need SAMPLE mode (OpenConfig compatibility issues)
"/network-instances/network-instance/protocols/protocol/bgp"

Acceptance Criteria

  • Drift monitor runs as long-lived service
  • ON_CHANGE subscriptions detect config changes
  • Drift events trigger Prefect flow
  • Human-in-the-loop approval with pause_flow_run()
  • Auto-remediation re-applies intent from InfraHub
  • Drift events are logged for audit
  • Depends on: #29 (Plan/Apply CLI), #38 (Prefect Setup)
  • Uses: gNMI Subscribe (Phase 1 validated)
  • Uses: InfraHub as intent source
  • Reference: Prefect pause_flow_run
## Description Implement drift detection using gNMI Subscribe to monitor device configuration in real-time and trigger remediation when unauthorized changes occur. ## Context Configuration drift happens when device configuration diverges from the intended state (e.g., manual CLI changes, device bugs). Using gNMI ON_CHANGE subscriptions, we can detect drift in real-time and optionally auto-remediate by re-applying intent from InfraHub. ## Tasks - [ ] Create `DriftMonitor` service using gNMI Subscribe - [ ] Implement ON_CHANGE subscriptions for key paths - [ ] Create `@flow handle_drift` to process drift events - [ ] Create `@flow drift_remediation` for auto-fix - [ ] Add configurable remediation policies (alert-only, auto-fix) - [ ] Implement drift event logging - [ ] Add CLI command `fabric-orch drift status` - [ ] Support `pause_flow_run()` for human approval ## Architecture ``` ┌─────────────────────┐ │ Fabric Device │ │ (gNMI Server) │ └──────────┬──────────┘ │ gNMI Subscribe (ON_CHANGE) ▼ ┌─────────────────────┐ │ Drift Monitor │ │ (Long-running) │ └──────────┬──────────┘ │ Drift Event ▼ ┌─────────────────────┐ │ handle_drift Flow │ │ - Log event │ │ - Check policy │ │ - Trigger remediate│ └──────────┬──────────┘ │ If auto_remediate ▼ ┌─────────────────────┐ │ drift_remediation │ │ - Get intent from │ │ InfraHub │ │ - Compute fix │ │ - Apply via gNMI │ └─────────────────────┘ ``` ## Drift Monitor Service ```python import asyncio from dataclasses import dataclass from datetime import datetime from src.gnmi import GnmiClient @dataclass class DriftEvent: device: str path: str expected_value: any actual_value: any timestamp: datetime class DriftMonitor: """Monitor fabric devices for configuration drift using gNMI Subscribe.""" # Paths to monitor for drift MONITORED_PATHS = [ "/interfaces/interface/config", "/network-instances/network-instance[name=default]/vlans/vlan/config", "/network-instances/network-instance[name=default]/protocols/protocol[identifier=BGP]/bgp/neighbors/neighbor/config", ] def __init__(self, devices: list[str], intent_cache: dict): self.devices = devices self.intent_cache = intent_cache # Expected state from InfraHub self.subscriptions = {} async def start(self): """Start monitoring all devices.""" tasks = [self._monitor_device(device) for device in self.devices] await asyncio.gather(*tasks) async def _monitor_device(self, device: str): """Subscribe to config changes on a device.""" async with GnmiClient(target=f"{device}:6030") as client: async for update in client.subscribe( paths=self.MONITORED_PATHS, mode="ON_CHANGE" ): await self._handle_update(device, update) async def _handle_update(self, device: str, update: dict): """Check if update represents drift from intent.""" path = update["path"] new_value = update["value"] expected = self._get_expected_value(device, path) if expected is not None and new_value != expected: event = DriftEvent( device=device, path=path, expected_value=expected, actual_value=new_value, timestamp=datetime.utcnow() ) await self._emit_drift_event(event) async def _emit_drift_event(self, event: DriftEvent): """Trigger drift handling flow.""" from prefect.deployments import run_deployment run_deployment( name="handle-drift/handle-drift-deployment", parameters={ "device": event.device, "path": event.path, "expected": event.expected_value, "actual": event.actual_value } ) ``` ## Prefect Drift Flows ```python from prefect import flow, task from prefect.input import RunInput class DriftApproval(RunInput): """Input for human-in-the-loop drift approval.""" approved: bool notes: str = "" @flow(name="handle-drift", log_prints=True) def handle_drift( device: str, path: str, expected: any, actual: any, auto_remediate: bool = False ): """ Handle a drift event detected by the monitor. Policies: - alert_only: Log and notify, no action - manual: Pause for human approval - auto: Automatically remediate """ print(f"⚠️ Drift detected on {device}") print(f" Path: {path}") print(f" Expected: {expected}") print(f" Actual: {actual}") # Log drift event log_drift_event(device, path, expected, actual) if not auto_remediate: # Pause for human approval approval = pause_flow_run( wait_for_input=DriftApproval.with_initial_data( description=f"Drift detected on {device} at {path}. Remediate?", ) ) if not approval.approved: print(f"❌ Remediation declined: {approval.notes}") return {"remediated": False, "reason": "declined"} # Trigger remediation return drift_remediation(device=device, paths=[path]) @flow(name="drift-remediation", log_prints=True) def drift_remediation(device: str, paths: list[str] = None): """ Remediate drift by re-applying intent from InfraHub. If paths is specified, only remediate those specific paths. Otherwise, full device reconciliation. """ from src.flows.reconcile import fabric_reconcile print(f"🔧 Remediating drift on {device}") result = fabric_reconcile( device=device, auto_apply=True, dry_run=False ) if result["applied"]: print(f"✅ Drift remediated on {device}") return result @task def log_drift_event(device: str, path: str, expected: any, actual: any): """Log drift event for audit trail.""" import structlog logger = structlog.get_logger() logger.warning( "drift_detected", device=device, path=path, expected_value=expected, actual_value=actual ) ``` ## CLI Integration ```bash # Check drift status across fabric fabric-orch drift status # Output: # Drift Monitor Status # ════════════════════════════════════════ # # Devices Monitored: 10 # Active Subscriptions: 10 # Intent Source: InfraHub (branch: main) # # Recent Drift Events (last 24h): # leaf1: 2 events (1 remediated, 1 pending) # leaf3: 1 event (auto-remediated) # # No active drift detected # View drift history fabric-orch drift history --device leaf1 # Manually trigger remediation fabric-orch drift remediate --device leaf1 ``` ## Output Files - `src/services/__init__.py` - `src/services/drift_monitor.py` - `src/flows/drift.py` - `src/flows/remediation.py` ## gNMI Subscribe Notes Based on Phase 1 findings, ON_CHANGE subscriptions work best with native YANG paths (not OpenConfig). Key paths to monitor: ```python # Working ON_CHANGE paths (Arista native) "/interfaces/interface[name=*]/state/oper-status" "/interfaces/interface[name=*]/config/description" "/network-instances/network-instance/vlans/vlan/config" # May need SAMPLE mode (OpenConfig compatibility issues) "/network-instances/network-instance/protocols/protocol/bgp" ``` ## Acceptance Criteria - Drift monitor runs as long-lived service - ON_CHANGE subscriptions detect config changes - Drift events trigger Prefect flow - Human-in-the-loop approval with `pause_flow_run()` - Auto-remediation re-applies intent from InfraHub - Drift events are logged for audit ## Related - Depends on: #29 (Plan/Apply CLI), #38 (Prefect Setup) - Uses: gNMI Subscribe (Phase 1 validated) - Uses: InfraHub as intent source - Reference: Prefect [pause_flow_run](https://docs.prefect.io/latest/develop/pause-resume/)
Damien added the phase-4-event-driven label 2026-01-30 13:25:57 +00:00
Damien added this to the Fabric Orchestrator project 2026-02-05 09:05:56 +00:00
Sign in to join this conversation.