""" gNMI Client Wrapper for Fabric Orchestrator. This module provides a high-level gNMI client using pygnmi with: - Connection management - Error handling with meaningful exceptions """ from __future__ import annotations import json from dataclasses import dataclass from typing import Any, Literal from pygnmi.client import gNMIclient # ============================================================================= # Exceptions # ============================================================================= class GNMIError(Exception): """Base exception for gNMI operations.""" pass class GNMIConnectionError(GNMIError): """Raised when connection to device fails.""" pass class GNMIPathError(GNMIError): """Raised when path is invalid or not found.""" pass # ============================================================================= # Data Types # ============================================================================= DataType = Literal["config", "state", "all"] SetOperation = Literal["update", "replace", "delete"] SubscribeMode = Literal["once", "stream", "poll"] StreamMode = Literal["on-change", "sample", "target-defined"] @dataclass class Capability: """Represents a gNMI capability/model.""" name: str organization: str version: str def __str__(self) -> str: return f"{self.name} ({self.organization}) v{self.version}" @dataclass class SubscriptionUpdate: """Represents a subscription update.""" path: str value: Any timestamp: int # ============================================================================= # gNMI Client # ============================================================================= class GNMIClient: """ gNMI Client wrapper using pygnmi. Provides high-level methods for gNMI operations with proper error handling and context manager support. Or synchronously: with GNMIClient(host="leaf1", port=6030, ...) as client: caps = client.capabilities() """ def __init__( self, host: str, port: int = 6030, username: str = "admin", password: str = "admin", insecure: bool = True, skip_verify: bool = True, timeout: int = 10, ): """ Initialize gNMI client. Args: host: Target hostname or IP port: gNMI port (default: 6030) username: Username for authentication password: Password for authentication insecure: Skip TLS verification (default: True for lab) skip_verify: Skip certificate verification timeout: Connection timeout in seconds """ self.host = host self.port = port self.username = username self.password = password self.insecure = insecure self.skip_verify = skip_verify self.timeout = timeout self._client: gNMIclient | None = None @property def target(self) -> str: """Get target string (host:port).""" return f"{self.host}:{self.port}" def _ensure_connected(self) -> gNMIclient: """Ensure client is connected.""" if self._client is None: raise GNMIConnectionError("Client not connected. Use context manager.") return self._client def __enter__(self) -> GNMIClient: """Enter context manager (sync).""" try: self._client = gNMIclient( target=(self.host, self.port), username=self.username, password=self.password, insecure=self.insecure, skip_verify=self.skip_verify, ) self._client.__enter__() return self except Exception as e: raise GNMIConnectionError( f"Failed to connect to {self.target}: {e}" ) from e def __exit__(self, exc_type, exc_val, exc_tb) -> None: """Exit context manager (sync).""" if self._client: try: self._client.__exit__(exc_type, exc_val, exc_tb) except Exception: pass # Ignore cleanup errors self._client = None # ========================================================================= # gNMI Operations # ========================================================================= def capabilities(self) -> dict[str, Any]: """ Get device capabilities. Returns: Dictionary containing: - gnmi_version: gNMI protocol version - supported_models: List of supported YANG models - supported_encodings: List of supported encodings """ client = self._ensure_connected() try: result = client.capabilities() return result except Exception as e: raise GNMIError(f"Failed to get capabilities: {e}") from e def get_models(self) -> list[Capability]: """ Get list of supported YANG models. Returns: List of Capability objects """ caps = self.capabilities() models = [] for model in caps.get("supported_models", []): models.append( Capability( name=model.get("name", ""), organization=model.get("organization", ""), version=model.get("version", ""), ) ) return models def get( self, path: str | list[str], data_type: DataType = "all", encoding: str = "json_ietf", ) -> dict[str, Any]: """ Get data at path. Args: path: YANG path or list of paths data_type: Type of data to retrieve: - "config": Configuration data only - "state": State/operational data only - "all": Both config and state (default) encoding: Data encoding (default: json_ietf) Returns: Dictionary with path data """ client = self._ensure_connected() paths = [path] if isinstance(path, str) else path # Map data_type to pygnmi datatype parameter datatype_map = { "config": "config", "state": "state", "all": "all", } datatype = datatype_map.get(data_type, "all") try: result = client.get(path=paths, datatype=datatype, encoding=encoding) return result except Exception as e: error_msg = str(e).lower() if "not found" in error_msg or "invalid" in error_msg: raise GNMIPathError(f"Path not found or invalid: {path}") from e raise GNMIError(f"Failed to get data at {path}: {e}") from e def set( self, path: str, value: Any, operation: SetOperation = "update", encoding: str = "json_ietf", dry_run: bool = True, ) -> dict[str, Any]: """ Set configuration at path. Args: path: YANG path value: Value to set (dict or JSON string) operation: Set operation type: - "update": Merge with existing config - "replace": Replace existing config - "delete": Delete config at path encoding: Data encoding (default: json_ietf) dry_run: If True, only validate without applying (default: True) Returns: Result of set operation """ if dry_run: # For dry-run, just return what would be set return { "dry_run": True, "operation": operation, "path": path, "value": value, "message": "Dry-run mode - no changes applied", } client = self._ensure_connected() # Parse value if it's a JSON string if isinstance(value, str): try: value = json.loads(value) except json.JSONDecodeError: pass # Keep as string - not valid JSON, will be set as string value try: if operation == "delete": result = client.set(delete=[path]) elif operation == "replace": result = client.set(replace=[(path, value)]) else: # update result = client.set(update=[(path, value)]) return result except Exception as e: raise GNMIError(f"Failed to set {path}: {e}") from e def subscribe( self, paths: str | list[str], mode: SubscribeMode = "stream", stream_mode: StreamMode = "on-change", sample_interval: int = 10, encoding: str = "json_ietf", ) -> Any: """ Subscribe to path updates. Args: paths: YANG path(s) to subscribe to mode: Subscription mode: - "once": Get data once and close - "stream": Continuous updates - "poll": Poll on demand stream_mode: Stream mode (for mode="stream"): - "on-change": Update on value change - "sample": Periodic sampling - "target-defined": Device decides sample_interval: Interval in seconds for sample mode encoding: Data encoding Yields: Subscription updates """ client = self._ensure_connected() path_list = [paths] if isinstance(paths, str) else paths # Build subscription list subscribe_list = [] for p in path_list: sub = { "path": p, "mode": stream_mode.replace("-", "_"), # on-change -> on_change } if stream_mode == "sample": sub["sample_interval"] = sample_interval * 1_000_000_000 # nanoseconds subscribe_list.append(sub) subscribe_request = { "subscription": subscribe_list, "mode": mode, "encoding": encoding, } try: return client.subscribe2(subscribe=subscribe_request) except Exception as e: raise GNMIError(f"Failed to subscribe to {paths}: {e}") from e # ========================================================================= # Utility Methods # ========================================================================= def explore(self, path: str = "/", depth: int | None = None) -> dict[str, Any]: """ Explore available paths under given path. Args: path: Base path to explore (default: root) depth: Maximum depth to explore (None = unlimited) Returns: Dictionary with discovered paths and data """ result = self.get(path, data_type="all") if depth is not None and depth > 0: # Limit depth by filtering result # This is a simplified implementation pass return result def validate_path(self, path: str) -> bool: """ Check if path exists on device. Args: path: YANG path to validate Returns: True if path exists, False otherwise """ try: self.get(path) return True except GNMIPathError: return False except GNMIError: return False