feat(build): configure project tooling and update dependencies

- Replace placeholder deps with click, pygnmi, and rich for CLI functionality
- Add fabric-orch CLI entry point via project.scripts
- Configure hatchling as build backend with wheel/sdist targets
- Add ruff linter configuration for code quality
- Add dev dependency group with ruff
- Alphabetize yang module imports
This commit is contained in:
darnodo
2025-12-31 09:36:00 +01:00
parent 3e76eba46a
commit e041dab724
9 changed files with 1524 additions and 25 deletions

396
src/gnmi/client.py Normal file
View File

@@ -0,0 +1,396 @@
"""
gNMI Client Wrapper for Fabric Orchestrator.
This module provides a high-level gNMI client using pygnmi with:
- Connection management
- Async context manager support
- Error handling with meaningful exceptions
"""
from __future__ import annotations
import json
from dataclasses import dataclass
from typing import Any, Literal
from pygnmi.client import gNMIclient
# =============================================================================
# Exceptions
# =============================================================================
class GNMIError(Exception):
"""Base exception for gNMI operations."""
pass
class GNMIConnectionError(GNMIError):
"""Raised when connection to device fails."""
pass
class GNMIPathError(GNMIError):
"""Raised when path is invalid or not found."""
pass
# =============================================================================
# Data Types
# =============================================================================
DataType = Literal["config", "state", "all"]
SetOperation = Literal["update", "replace", "delete"]
SubscribeMode = Literal["once", "stream", "poll"]
StreamMode = Literal["on-change", "sample", "target-defined"]
@dataclass
class Capability:
"""Represents a gNMI capability/model."""
name: str
organization: str
version: str
def __str__(self) -> str:
return f"{self.name} ({self.organization}) v{self.version}"
@dataclass
class SubscriptionUpdate:
"""Represents a subscription update."""
path: str
value: Any
timestamp: int
# =============================================================================
# gNMI Client
# =============================================================================
class GNMIClient:
"""
gNMI Client wrapper using pygnmi.
Provides high-level methods for gNMI operations with proper
error handling and context manager support.
Usage:
async with GNMIClient(host="leaf1", port=6030, ...) as client:
caps = await client.capabilities()
Or synchronously:
with GNMIClient(host="leaf1", port=6030, ...) as client:
caps = client.capabilities()
"""
def __init__(
self,
host: str,
port: int = 6030,
username: str = "admin",
password: str = "admin",
insecure: bool = True,
skip_verify: bool = True,
timeout: int = 10,
):
"""
Initialize gNMI client.
Args:
host: Target hostname or IP
port: gNMI port (default: 6030)
username: Username for authentication
password: Password for authentication
insecure: Skip TLS verification (default: True for lab)
skip_verify: Skip certificate verification
timeout: Connection timeout in seconds
"""
self.host = host
self.port = port
self.username = username
self.password = password
self.insecure = insecure
self.skip_verify = skip_verify
self.timeout = timeout
self._client: gNMIclient | None = None
@property
def target(self) -> str:
"""Get target string (host:port)."""
return f"{self.host}:{self.port}"
def _ensure_connected(self) -> gNMIclient:
"""Ensure client is connected."""
if self._client is None:
raise GNMIConnectionError("Client not connected. Use context manager.")
return self._client
def __enter__(self) -> GNMIClient:
"""Enter context manager (sync)."""
try:
self._client = gNMIclient(
target=(self.host, self.port),
username=self.username,
password=self.password,
insecure=self.insecure,
skip_verify=self.skip_verify,
)
self._client.__enter__()
return self
except Exception as e:
raise GNMIConnectionError(
f"Failed to connect to {self.target}: {e}"
) from e
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Exit context manager (sync)."""
if self._client:
try:
self._client.__exit__(exc_type, exc_val, exc_tb)
except Exception:
pass # Ignore cleanup errors
self._client = None
async def __aenter__(self) -> GNMIClient:
"""Enter async context manager."""
return self.__enter__()
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Exit async context manager."""
self.__exit__(exc_type, exc_val, exc_tb)
# =========================================================================
# gNMI Operations
# =========================================================================
def capabilities(self) -> dict[str, Any]:
"""
Get device capabilities.
Returns:
Dictionary containing:
- gnmi_version: gNMI protocol version
- supported_models: List of supported YANG models
- supported_encodings: List of supported encodings
"""
client = self._ensure_connected()
try:
result = client.capabilities()
return result
except Exception as e:
raise GNMIError(f"Failed to get capabilities: {e}") from e
def get_models(self) -> list[Capability]:
"""
Get list of supported YANG models.
Returns:
List of Capability objects
"""
caps = self.capabilities()
models = []
for model in caps.get("supported_models", []):
models.append(
Capability(
name=model.get("name", ""),
organization=model.get("organization", ""),
version=model.get("version", ""),
)
)
return models
def get(
self,
path: str | list[str],
data_type: DataType = "all",
encoding: str = "json_ietf",
) -> dict[str, Any]:
"""
Get data at path.
Args:
path: YANG path or list of paths
data_type: Type of data to retrieve:
- "config": Configuration data only
- "state": State/operational data only
- "all": Both config and state (default)
encoding: Data encoding (default: json_ietf)
Returns:
Dictionary with path data
"""
client = self._ensure_connected()
paths = [path] if isinstance(path, str) else path
# Map data_type to pygnmi datatype parameter
datatype_map = {
"config": "config",
"state": "state",
"all": "all",
}
datatype = datatype_map.get(data_type, "all")
try:
result = client.get(path=paths, datatype=datatype, encoding=encoding)
return result
except Exception as e:
error_msg = str(e).lower()
if "not found" in error_msg or "invalid" in error_msg:
raise GNMIPathError(f"Path not found or invalid: {path}") from e
raise GNMIError(f"Failed to get data at {path}: {e}") from e
def set(
self,
path: str,
value: Any,
operation: SetOperation = "update",
encoding: str = "json_ietf",
dry_run: bool = True,
) -> dict[str, Any]:
"""
Set configuration at path.
Args:
path: YANG path
value: Value to set (dict or JSON string)
operation: Set operation type:
- "update": Merge with existing config
- "replace": Replace existing config
- "delete": Delete config at path
encoding: Data encoding (default: json_ietf)
dry_run: If True, only validate without applying (default: True)
Returns:
Result of set operation
"""
if dry_run:
# For dry-run, just return what would be set
return {
"dry_run": True,
"operation": operation,
"path": path,
"value": value,
"message": "Dry-run mode - no changes applied",
}
client = self._ensure_connected()
# Parse value if it's a JSON string
if isinstance(value, str):
try:
value = json.loads(value)
except json.JSONDecodeError:
pass # Keep as string if not valid JSON
try:
if operation == "delete":
result = client.set(delete=[path])
elif operation == "replace":
result = client.set(replace=[(path, value)])
else: # update
result = client.set(update=[(path, value)])
return result
except Exception as e:
raise GNMIError(f"Failed to set {path}: {e}") from e
def subscribe(
self,
paths: str | list[str],
mode: SubscribeMode = "stream",
stream_mode: StreamMode = "on-change",
sample_interval: int = 10,
encoding: str = "json_ietf",
) -> Any:
"""
Subscribe to path updates.
Args:
paths: YANG path(s) to subscribe to
mode: Subscription mode:
- "once": Get data once and close
- "stream": Continuous updates
- "poll": Poll on demand
stream_mode: Stream mode (for mode="stream"):
- "on-change": Update on value change
- "sample": Periodic sampling
- "target-defined": Device decides
sample_interval: Interval in seconds for sample mode
encoding: Data encoding
Yields:
Subscription updates
"""
client = self._ensure_connected()
path_list = [paths] if isinstance(paths, str) else paths
# Build subscription list
subscribe_list = []
for p in path_list:
sub = {
"path": p,
"mode": stream_mode.replace("-", "_"), # on-change -> on_change
}
if stream_mode == "sample":
sub["sample_interval"] = sample_interval * 1_000_000_000 # nanoseconds
subscribe_list.append(sub)
subscribe_request = {
"subscription": subscribe_list,
"mode": mode,
"encoding": encoding,
}
try:
return client.subscribe2(subscribe=subscribe_request)
except Exception as e:
raise GNMIError(f"Failed to subscribe to {paths}: {e}") from e
# =========================================================================
# Utility Methods
# =========================================================================
def explore(self, path: str = "/", depth: int | None = None) -> dict[str, Any]:
"""
Explore available paths under given path.
Args:
path: Base path to explore (default: root)
depth: Maximum depth to explore (None = unlimited)
Returns:
Dictionary with discovered paths and data
"""
result = self.get(path, data_type="all")
if depth is not None and depth > 0:
# Limit depth by filtering result
# This is a simplified implementation
pass
return result
def validate_path(self, path: str) -> bool:
"""
Check if path exists on device.
Args:
path: YANG path to validate
Returns:
True if path exists, False otherwise
"""
try:
self.get(path)
return True
except GNMIPathError:
return False
except GNMIError:
return False