diff --git a/src/gnmi_eos/cli.py b/src/gnmi_eos/cli.py new file mode 100644 index 0000000..e92d80e --- /dev/null +++ b/src/gnmi_eos/cli.py @@ -0,0 +1,378 @@ +""" +gnmi-eos CLI. + +Commands: +- capabilities: List device capabilities and supported YANG models +- get: Get configuration or state data at a YANG path +- set: Set configuration at a YANG path +- subscribe: Subscribe to YANG path updates +- paths: Show validated YANG paths for Arista EOS +""" + +from __future__ import annotations + +import json +import sys +from pathlib import Path +from typing import Any + +import click +from rich.console import Console +from rich.json import JSON +from rich.panel import Panel +from rich.table import Table + +from gnmi_eos.client import GNMIClient, GNMIConnectionError, GNMIError, GNMIPathError + +# ============================================================================== +# Constants +# ============================================================================== + +console = Console() +error_console = Console(stderr=True) + +ENV_TARGET = "GNMI_TARGET" +ENV_USERNAME = "GNMI_USERNAME" +ENV_PASSWORD = "GNMI_PASSWORD" + +# ============================================================================== +# Helpers +# ============================================================================== + + +def output_result(data: Any, output_format: str = "pretty", title: str | None = None) -> None: + """Output result in specified format (pretty / json / file:).""" + json_str = json.dumps(data, indent=2, default=str) + + if output_format == "json": + click.echo(json_str) + elif output_format.startswith("file:"): + file_path = output_format[5:] + Path(file_path).write_text(json_str) + console.print(f"[green]✓[/green] Results saved to [bold]{file_path}[/bold]") + else: + if title: + console.print(Panel(JSON(json_str), title=title, border_style="blue")) + else: + console.print(JSON(json_str)) + + +def parse_target(target: str) -> tuple[str, int]: + """Parse target string into host and port.""" + if ":" in target: + host, port_str = target.rsplit(":", 1) + return host, int(port_str) + return target, 6030 + + +def handle_error(e: Exception, context: str = "") -> None: + """Handle and display error, then exit.""" + if isinstance(e, GNMIConnectionError): + error_console.print(f"[red]✗ Connection Error:[/red] {e}") + error_console.print( + "\n[dim]Hints:[/dim]\n" + " • Check if the target is reachable\n" + " • Verify gNMI is enabled on the device\n" + " • Check credentials\n" + " • Try --insecure flag for self-signed certs" + ) + elif isinstance(e, GNMIPathError): + error_console.print(f"[red]✗ Path Error:[/red] {e}") + error_console.print( + "\n[dim]Hints:[/dim]\n" + " • Use 'gnmi-eos capabilities' to see supported models\n" + " • Check path syntax (e.g., /interfaces/interface[name=Ethernet1])\n" + " • Try a parent path first" + ) + elif isinstance(e, GNMIError): + error_console.print(f"[red]✗ gNMI Error:[/red] {e}") + else: + error_console.print(f"[red]✗ Error:[/red] {e}") + + if context: + error_console.print(f"[dim]Context: {context}[/dim]") + + sys.exit(1) + + +def common_options(f): + """Common connection options shared by all commands.""" + f = click.option("--target", "-t", envvar=ENV_TARGET, required=True, help="Target device (host:port). Env: GNMI_TARGET")(f) + f = click.option("--username", "-u", envvar=ENV_USERNAME, default="admin", help="Username. Env: GNMI_USERNAME")(f) + f = click.option("--password", "-p", envvar=ENV_PASSWORD, default="admin", help="Password. Env: GNMI_PASSWORD")(f) + f = click.option("--insecure/--no-insecure", default=True, help="Skip TLS verification (default: True for lab)")(f) + f = click.option("--output", "-o", default="pretty", help="Output format: pretty, json, or file:")(f) + return f + + +# ============================================================================== +# CLI +# ============================================================================== + + +@click.group() +@click.version_option(version="0.1.0", prog_name="gnmi-eos") +def cli(): + """ + gnmi-eos — gNMI tool for Arista EOS devices. + + Get/set configuration, subscribe to counters, explore YANG paths. + """ + pass + + +# ============================================================================== +# capabilities +# ============================================================================== + + +@cli.command("capabilities") +@common_options +def cmd_capabilities(target: str, username: str, password: str, insecure: bool, output: str): + """List device capabilities and supported YANG models. + + Example: + + gnmi-eos capabilities --target leaf1:6030 + """ + host, port = parse_target(target) + + try: + with GNMIClient(host=host, port=port, username=username, password=password, insecure=insecure) as client: + caps = client.capabilities() + + if output == "pretty": + console.print(f"\n[bold blue]Device Capabilities[/bold blue] - {target}\n") + console.print(f"gNMI Version: [green]{caps.get('gnmi_version', 'unknown')}[/green]") + encodings = caps.get("supported_encodings", []) + console.print(f"Supported Encodings: [cyan]{', '.join(encodings)}[/cyan]") + + models = caps.get("supported_models", []) + if models: + console.print(f"\n[bold]Supported Models ({len(models)}):[/bold]\n") + table = Table(show_header=True, header_style="bold") + table.add_column("Model Name", style="cyan") + table.add_column("Organization") + table.add_column("Version", style="green") + for m in sorted(models, key=lambda x: x.get("name", "")): + table.add_row(m.get("name", ""), m.get("organization", ""), m.get("version", "")) + console.print(table) + else: + output_result(caps, output, "Device Capabilities") + + except Exception as e: + handle_error(e, f"Getting capabilities from {target}") + + +# ============================================================================== +# get +# ============================================================================== + + +@cli.command("get") +@common_options +@click.option("--path", "-P", required=True, help="YANG path to get data from") +@click.option("--type", "-T", "data_type", type=click.Choice(["config", "state", "all"]), default="all", help="Type of data to retrieve (default: all)") +def cmd_get(target: str, username: str, password: str, insecure: bool, output: str, path: str, data_type: str): + """Get configuration or state data at a YANG path. + + Examples: + + gnmi-eos get --target leaf1:6030 --path "/interfaces/interface[name=Ethernet1]/state" + + gnmi-eos get --target leaf1:6030 --path "/interfaces" --type config + + gnmi-eos get --target leaf1:6030 --path "/" --output file:dump.json + """ + host, port = parse_target(target) + + try: + with GNMIClient(host=host, port=port, username=username, password=password, insecure=insecure) as client: + result = client.get(path, data_type=data_type) + + if output == "pretty": + console.print(f"\n[bold blue]GET {path}[/bold blue] ({data_type})\n") + output_result(result, "pretty") + else: + output_result(result, output, f"GET {path}") + + except Exception as e: + handle_error(e, f"Getting data at {path}") + + +# ============================================================================== +# set +# ============================================================================== + + +@cli.command("set") +@common_options +@click.option("--path", "-P", required=True, help="YANG path to set") +@click.option("--value", "-v", required=True, help="Value to set (JSON string or simple value)") +@click.option("--operation", "-O", type=click.Choice(["update", "replace", "delete"]), default="update", help="Set operation type (default: update)") +@click.option("--dry-run/--no-dry-run", default=True, help="Dry-run mode — don't apply changes (default: True)") +def cmd_set(target: str, username: str, password: str, insecure: bool, output: str, path: str, value: str, operation: str, dry_run: bool): + """Set configuration at a YANG path. + + Dry-run is enabled by default. Use --no-dry-run to actually apply. + + Examples: + + gnmi-eos set --target leaf1:6030 \\ + --path "/interfaces/interface[name=Ethernet1]/config/description" \\ + --value "Uplink to Spine" + + gnmi-eos set --target leaf1:6030 \\ + --path "/interfaces/interface[name=Ethernet1]/config/description" \\ + --value "Uplink to Spine" --no-dry-run + """ + host, port = parse_target(target) + + if dry_run: + console.print("[yellow]⚠ DRY-RUN MODE[/yellow] - No changes will be applied\n") + else: + console.print("[red]⚠ LIVE MODE[/red] - Changes WILL be applied!\n") + + try: + with GNMIClient(host=host, port=port, username=username, password=password, insecure=insecure) as client: + result = client.set(path=path, value=value, operation=operation, dry_run=dry_run) + + if output == "pretty": + if dry_run: + console.print(f"[bold blue]SET (dry-run)[/bold blue] {path}\n") + console.print(f"Operation: [cyan]{operation}[/cyan]") + console.print(f"Value: [green]{value}[/green]") + console.print("\n[dim]Use --no-dry-run to apply this change[/dim]") + else: + console.print(f"[bold green]SET (applied)[/bold green] {path}\n") + output_result(result, "pretty") + else: + output_result(result, output, f"SET {path}") + + except Exception as e: + handle_error(e, f"Setting data at {path}") + + +# ============================================================================== +# subscribe +# ============================================================================== + + +@cli.command("subscribe") +@common_options +@click.option("--path", "-P", required=True, multiple=True, help="YANG path(s) to subscribe to (can specify multiple)") +@click.option("--mode", "-m", type=click.Choice(["on-change", "sample", "target-defined"]), default="on-change", help="Subscription mode (default: on-change)") +@click.option("--interval", "-i", type=int, default=10, help="Sample interval in seconds (for sample mode)") +@click.option("--count", "-c", type=int, default=None, help="Number of updates to receive before exiting") +def cmd_subscribe(target: str, username: str, password: str, insecure: bool, output: str, path: tuple[str, ...], mode: str, interval: int, count: int | None): + """Subscribe to YANG path updates. + + Note: For Arista EOS, use native paths WITHOUT module prefixes + for ON_CHANGE subscriptions. + + Examples: + + gnmi-eos subscribe --target leaf1:6030 \\ + --path "/interfaces/interface/state/oper-status" --mode on-change + + gnmi-eos subscribe --target leaf1:6030 \\ + --path "/interfaces/interface[name=Ethernet1]/state/counters" \\ + --mode sample --interval 10 --count 5 + """ + host, port = parse_target(target) + paths = list(path) + + console.print(f"\n[bold blue]Subscribing to {len(paths)} path(s)[/bold blue]\n") + for p in paths: + console.print(f" • {p}") + console.print(f"\nMode: [cyan]{mode}[/cyan]") + if mode == "sample": + console.print(f"Interval: [cyan]{interval}s[/cyan]") + console.print("\n[dim]Press Ctrl+C to stop[/dim]\n") + console.print("─" * 60) + + try: + with GNMIClient(host=host, port=port, username=username, password=password, insecure=insecure) as client: + subscription = client.subscribe(paths=paths, mode="stream", stream_mode=mode, sample_interval=interval) + + update_count = 0 + try: + for update in subscription: + update_count += 1 + + if output == "pretty": + console.print(f"\n[green]Update #{update_count}[/green]") + output_result(update, "pretty") + else: + output_result(update, output) + + if count is not None and update_count >= count: + console.print(f"\n[yellow]Reached {count} updates, stopping[/yellow]") + break + + except KeyboardInterrupt: + console.print(f"\n\n[yellow]Stopped after {update_count} updates[/yellow]") + + except Exception as e: + handle_error(e, f"Subscribing to {paths}") + + +# ============================================================================== +# paths +# ============================================================================== + + +@cli.command("paths") +def cmd_paths(): + """Show validated YANG paths for Arista EOS 4.35.0F. + + Reference of paths usable with the get, set, and subscribe commands. + """ + console.print("\n[bold blue]Validated YANG Paths — Arista EOS 4.35.0F[/bold blue]\n") + + table = Table(show_header=True, header_style="bold") + table.add_column("Category", style="cyan") + table.add_column("Path") + table.add_column("Notes", style="dim") + + rows = [ + ("Interfaces", "/interfaces/interface[name=Ethernet1]/state", "Full interface state"), + ("Interfaces", "/interfaces/interface[name=Ethernet1]/state/oper-status", "Operational status"), + ("Interfaces", "/interfaces/interface[name=Ethernet1]/state/counters", "Interface counters"), + ("Loopbacks", "/interfaces/interface[name=Loopback0]", "Loopback interface"), + ("VLANs", "/network-instances/network-instance[name=default]/vlans/vlan", "All VLANs"), + ("BGP", "/network-instances/network-instance[name=default]/protocols/protocol[identifier=BGP][name=BGP]/bgp/neighbors/neighbor/state", "All BGP neighbors"), + ("VXLAN", "/interfaces/interface[name=Vxlan1]/arista-vxlan/vlan-to-vnis", "VLAN-to-VNI mappings"), + ("VXLAN", "/interfaces/interface[name=Vxlan1]/arista-vxlan/config", "VXLAN config"), + ("MLAG", "/arista/eos/mlag/config", "MLAG config (config only via gNMI)"), + ("EVPN", "/arista/eos/evpn", "EVPN config (config only via gNMI)"), + ("System", "/system/config/hostname", "System hostname"), + ] + + for category, path, notes in rows: + table.add_row(category, path, notes) + + console.print(table) + + console.print("\n[bold]Path Syntax Tips:[/bold]") + console.print(" • Use brackets for keys: [name=Ethernet1]") + console.print(" • Multiple keys: [identifier=BGP][name=BGP]") + console.print(" • Wildcards not supported in GET, but can omit keys for all") + console.print("\n[bold]Subscription Notes:[/bold]") + console.print(" • Use native paths WITHOUT module prefix for ON_CHANGE") + console.print(" ✅ /interfaces/interface[name=Ethernet1]/state") + console.print(" ❌ /openconfig-interfaces:interfaces/...") + + +# ============================================================================== +# Entry Point +# ============================================================================== + + +def main(): + """Main entry point.""" + cli() + + +if __name__ == "__main__": + main()