diff --git a/pyproject.toml b/pyproject.toml index ab99bfe..ac4f87b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,6 +7,7 @@ requires-python = ">=3.12" dependencies = [ "click>=8.1.0", "pygnmi>=0.8.0", + "pynetbox>=7.0.0", "rich>=13.0.0", ] diff --git a/src/netbox/__init__.py b/src/netbox/__init__.py new file mode 100644 index 0000000..cf75abf --- /dev/null +++ b/src/netbox/__init__.py @@ -0,0 +1,36 @@ +""" +NetBox Client Module for Fabric Orchestrator. + +This module provides a NetBox API client for fetching fabric intent data +using pynetbox. + +Usage: + from src.netbox import FabricNetBoxClient + + client = FabricNetBoxClient( + url="http://netbox.local", + token="your-token" + ) + + # Get all leaf devices + leaves = client.get_leaf_devices() + + # Get complete intent for a device + intent = client.get_device_intent("leaf1") +""" + +from .client import ( + FabricNetBoxClient, + NetBoxAPIError, + NetBoxConnectionError, + NetBoxError, + NetBoxNotFoundError, +) + +__all__ = [ + "FabricNetBoxClient", + "NetBoxError", + "NetBoxConnectionError", + "NetBoxNotFoundError", + "NetBoxAPIError", +] \ No newline at end of file diff --git a/src/netbox/client.py b/src/netbox/client.py new file mode 100644 index 0000000..fceee26 --- /dev/null +++ b/src/netbox/client.py @@ -0,0 +1,755 @@ +""" +NetBox API Client for Fabric Orchestrator. + +This module provides a client for fetching fabric intent data from NetBox +using pynetbox. It retrieves device, interface, BGP, EVPN, and VRF configuration +to determine the desired network state. + +Usage: + from src.netbox import FabricNetBoxClient + + client = FabricNetBoxClient( + url="http://netbox.local", + token="your-token" + ) + + # Get all leaf devices + leaves = client.get_leaf_devices() + + # Get complete intent for a device + intent = client.get_device_intent("leaf1") +""" + +from __future__ import annotations + +import logging +import os +from typing import Any + +import pynetbox +from pynetbox.core.query import RequestError + +# ============================================================================= +# Exceptions +# ============================================================================= + + +class NetBoxError(Exception): + """Base exception for NetBox operations.""" + + pass + + +class NetBoxConnectionError(NetBoxError): + """Raised when connection to NetBox fails.""" + + pass + + +class NetBoxNotFoundError(NetBoxError): + """Raised when requested resource is not found.""" + + pass + + +class NetBoxAPIError(NetBoxError): + """Raised when NetBox API returns an error.""" + + pass + + +# ============================================================================= +# Logger +# ============================================================================= + +logger = logging.getLogger(__name__) + + +# ============================================================================= +# NetBox Client +# ============================================================================= + + +class FabricNetBoxClient: + """ + NetBox API client for fabric intent retrieval. + + Provides methods to fetch device, interface, BGP, EVPN, and VRF + configuration from NetBox to determine desired network state. + + Supports configuration via environment variables: + - NETBOX_URL: NetBox server URL + - NETBOX_TOKEN: API token for authentication + + Example: + client = FabricNetBoxClient( + url="http://netbox.local", + token="your-token" + ) + leaves = client.get_leaf_devices() + """ + + def __init__( + self, + url: str | None = None, + token: str | None = None, + threading: bool = False, + ): + """ + Initialize NetBox client. + + Args: + url: NetBox server URL (or NETBOX_URL env var) + token: API token (or NETBOX_TOKEN env var) + threading: Enable threading for requests (default: False) + + Raises: + NetBoxConnectionError: If URL or token is not provided + """ + self.url = url or os.environ.get("NETBOX_URL") + self.token = token or os.environ.get("NETBOX_TOKEN") + + if not self.url: + raise NetBoxConnectionError( + "NetBox URL not provided. Set NETBOX_URL environment variable or pass url parameter." + ) + if not self.token: + raise NetBoxConnectionError( + "NetBox token not provided. Set NETBOX_TOKEN environment variable or pass token parameter." + ) + + logger.debug(f"Connecting to NetBox at {self.url}") + + try: + self._api = pynetbox.api(self.url, token=self.token, threading=threading) + # Test connection by fetching API status + self._api.status() + logger.info(f"Successfully connected to NetBox at {self.url}") + except Exception as e: + raise NetBoxConnectionError(f"Failed to connect to NetBox at {self.url}: {e}") from e + + # ========================================================================= + # Device Methods + # ========================================================================= + + def get_device(self, name: str) -> dict[str, Any]: + """ + Get a device by name. + + Args: + name: Device name + + Returns: + Device data as dictionary + + Raises: + NetBoxNotFoundError: If device not found + """ + logger.debug(f"Fetching device: {name}") + try: + device = self._api.dcim.devices.get(name=name) + if not device: + raise NetBoxNotFoundError(f"Device not found: {name}") + return dict(device) + except RequestError as e: + raise NetBoxAPIError(f"Failed to fetch device {name}: {e}") from e + + def get_devices_by_role(self, role: str) -> list[dict[str, Any]]: + """ + Get all devices with a specific role. + + Args: + role: Device role slug (e.g., 'leaf', 'spine') + + Returns: + List of device dictionaries + """ + logger.debug(f"Fetching devices with role: {role}") + try: + devices = self._api.dcim.devices.filter(role=role) + return [dict(d) for d in devices] + except RequestError as e: + raise NetBoxAPIError(f"Failed to fetch devices with role {role}: {e}") from e + + def get_leaf_devices(self) -> list[dict[str, Any]]: + """ + Get all leaf devices. + + Returns: + List of leaf device dictionaries + """ + return self.get_devices_by_role("leaf") + + def get_spine_devices(self) -> list[dict[str, Any]]: + """ + Get all spine devices. + + Returns: + List of spine device dictionaries + """ + return self.get_devices_by_role("spine") + + def get_device_asn(self, device_name: str) -> int | None: + """ + Get ASN assigned to a device. + + Args: + device_name: Device name + + Returns: + ASN number or None if not assigned + """ + logger.debug(f"Fetching ASN for device: {device_name}") + try: + asns = self._api.ipam.asns.filter(device=device_name) + asn_list = list(asns) + if asn_list: + return asn_list[0].asn + return None + except RequestError as e: + raise NetBoxAPIError(f"Failed to fetch ASN for {device_name}: {e}") from e + + # ========================================================================= + # Interface Methods + # ========================================================================= + + def get_device_interfaces( + self, device_name: str, interface_type: str | None = None + ) -> list[dict[str, Any]]: + """ + Get interfaces for a device. + + Args: + device_name: Device name + interface_type: Optional interface type filter (e.g., 'virtual', 'lag', '1000base-t') + + Returns: + List of interface dictionaries + """ + logger.debug(f"Fetching interfaces for device: {device_name}, type: {interface_type}") + try: + filters = {"device": device_name} + if interface_type: + filters["type"] = interface_type + interfaces = self._api.dcim.interfaces.filter(**filters) + return [dict(i) for i in interfaces] + except RequestError as e: + raise NetBoxAPIError(f"Failed to fetch interfaces for {device_name}: {e}") from e + + def get_device_loopbacks(self, device_name: str) -> list[dict[str, Any]]: + """ + Get loopback interfaces for a device. + + Args: + device_name: Device name + + Returns: + List of loopback interface dictionaries + """ + logger.debug(f"Fetching loopbacks for device: {device_name}") + try: + interfaces = self._api.dcim.interfaces.filter(device=device_name, type="virtual") + loopbacks = [dict(i) for i in interfaces if i.name.lower().startswith("loopback")] + return loopbacks + except RequestError as e: + raise NetBoxAPIError(f"Failed to fetch loopbacks for {device_name}: {e}") from e + + def get_device_lags(self, device_name: str) -> list[dict[str, Any]]: + """ + Get LAG/Port-Channel interfaces for a device. + + Args: + device_name: Device name + + Returns: + List of LAG interface dictionaries + """ + logger.debug(f"Fetching LAGs for device: {device_name}") + return self.get_device_interfaces(device_name, interface_type="lag") + + def get_device_svis(self, device_name: str) -> list[dict[str, Any]]: + """ + Get SVI (VLAN) interfaces for a device. + + Args: + device_name: Device name + + Returns: + List of SVI interface dictionaries + """ + logger.debug(f"Fetching SVIs for device: {device_name}") + try: + interfaces = self._api.dcim.interfaces.filter(device=device_name, type="virtual") + svis = [dict(i) for i in interfaces if i.name.lower().startswith("vlan")] + return svis + except RequestError as e: + raise NetBoxAPIError(f"Failed to fetch SVIs for {device_name}: {e}") from e + + # ========================================================================= + # IP Address Methods + # ========================================================================= + + def get_interface_ips( + self, device_name: str, interface_name: str + ) -> list[dict[str, Any]]: + """ + Get IP addresses assigned to an interface. + + Args: + device_name: Device name + interface_name: Interface name + + Returns: + List of IP address dictionaries + """ + logger.debug(f"Fetching IPs for {device_name}:{interface_name}") + try: + ips = self._api.ipam.ip_addresses.filter( + device=device_name, interface=interface_name + ) + return [dict(ip) for ip in ips] + except RequestError as e: + raise NetBoxAPIError( + f"Failed to fetch IPs for {device_name}:{interface_name}: {e}" + ) from e + + def get_device_ips(self, device_name: str) -> list[dict[str, Any]]: + """ + Get all IP addresses for a device. + + Args: + device_name: Device name + + Returns: + List of IP address dictionaries with interface info + """ + logger.debug(f"Fetching all IPs for device: {device_name}") + try: + ips = self._api.ipam.ip_addresses.filter(device=device_name) + return [dict(ip) for ip in ips] + except RequestError as e: + raise NetBoxAPIError(f"Failed to fetch IPs for {device_name}: {e}") from e + + # ========================================================================= + # BGP Methods (Plugin) + # ========================================================================= + + def get_bgp_sessions(self, device_name: str | None = None) -> list[dict[str, Any]]: + """ + Get BGP sessions from NetBox BGP plugin. + + Args: + device_name: Optional device name filter + + Returns: + List of BGP session dictionaries + """ + logger.debug(f"Fetching BGP sessions for device: {device_name or 'all'}") + try: + if device_name: + sessions = self._api.plugins.bgp.sessions.filter(device=device_name) + else: + sessions = self._api.plugins.bgp.sessions.all() + return [dict(s) for s in sessions] + except RequestError as e: + raise NetBoxAPIError(f"Failed to fetch BGP sessions: {e}") from e + except AttributeError: + logger.warning("NetBox BGP plugin not available") + return [] + + def get_bgp_peer_groups(self, device_name: str | None = None) -> list[dict[str, Any]]: + """ + Get BGP peer groups from NetBox BGP plugin. + + Args: + device_name: Optional device name filter + + Returns: + List of BGP peer group dictionaries + """ + logger.debug(f"Fetching BGP peer groups for device: {device_name or 'all'}") + try: + if device_name: + peer_groups = self._api.plugins.bgp.peer_groups.filter(device=device_name) + else: + peer_groups = self._api.plugins.bgp.peer_groups.all() + return [dict(pg) for pg in peer_groups] + except RequestError as e: + raise NetBoxAPIError(f"Failed to fetch BGP peer groups: {e}") from e + except AttributeError: + logger.warning("NetBox BGP plugin not available") + return [] + + # ========================================================================= + # VLAN Methods + # ========================================================================= + + def get_vlans(self, site: str | None = None) -> list[dict[str, Any]]: + """ + Get VLANs. + + Args: + site: Optional site filter + + Returns: + List of VLAN dictionaries + """ + logger.debug(f"Fetching VLANs for site: {site or 'all'}") + try: + if site: + vlans = self._api.ipam.vlans.filter(site=site) + else: + vlans = self._api.ipam.vlans.all() + return [dict(v) for v in vlans] + except RequestError as e: + raise NetBoxAPIError(f"Failed to fetch VLANs: {e}") from e + + def get_vlan(self, vlan_id: int, site: str | None = None) -> dict[str, Any] | None: + """ + Get a specific VLAN by ID. + + Args: + vlan_id: VLAN ID + site: Optional site filter + + Returns: + VLAN dictionary or None if not found + """ + logger.debug(f"Fetching VLAN {vlan_id}") + try: + filters = {"vid": vlan_id} + if site: + filters["site"] = site + vlans = self._api.ipam.vlans.filter(**filters) + vlan_list = list(vlans) + if vlan_list: + return dict(vlan_list[0]) + return None + except RequestError as e: + raise NetBoxAPIError(f"Failed to fetch VLAN {vlan_id}: {e}") from e + + # ========================================================================= + # L2VPN/EVPN Methods + # ========================================================================= + + def get_l2vpns(self, l2vpn_type: str = "evpn") -> list[dict[str, Any]]: + """ + Get L2VPNs (EVPN instances). + + Args: + l2vpn_type: L2VPN type filter (default: 'evpn') + + Returns: + List of L2VPN dictionaries with VNI identifiers + """ + logger.debug(f"Fetching L2VPNs of type: {l2vpn_type}") + try: + l2vpns = self._api.vpn.l2vpns.filter(type=l2vpn_type) + return [dict(l2vpn) for l2vpn in l2vpns] + except RequestError as e: + raise NetBoxAPIError(f"Failed to fetch L2VPNs: {e}") from e + + def get_l2vpn_terminations( + self, device_name: str | None = None, l2vpn_name: str | None = None + ) -> list[dict[str, Any]]: + """ + Get L2VPN terminations (VLAN-to-device mappings). + + Args: + device_name: Optional device name filter + l2vpn_name: Optional L2VPN name filter + + Returns: + List of L2VPN termination dictionaries + """ + logger.debug( + f"Fetching L2VPN terminations for device: {device_name or 'all'}, " + f"l2vpn: {l2vpn_name or 'all'}" + ) + try: + filters = {} + if l2vpn_name: + filters["l2vpn"] = l2vpn_name + if device_name: + filters["device"] = device_name + + if filters: + terminations = self._api.vpn.l2vpn_terminations.filter(**filters) + else: + terminations = self._api.vpn.l2vpn_terminations.all() + return [dict(t) for t in terminations] + except RequestError as e: + raise NetBoxAPIError(f"Failed to fetch L2VPN terminations: {e}") from e + + def get_vni_for_vlan(self, vlan_id: int) -> int | None: + """ + Get VNI for a VLAN from L2VPN configuration. + + Args: + vlan_id: VLAN ID + + Returns: + VNI number or None if not mapped + """ + logger.debug(f"Fetching VNI for VLAN {vlan_id}") + try: + # Get L2VPN terminations for this VLAN + terminations = self._api.vpn.l2vpn_terminations.filter(vlan_id=vlan_id) + term_list = list(terminations) + if term_list: + l2vpn = term_list[0].l2vpn + if l2vpn and hasattr(l2vpn, "identifier"): + return l2vpn.identifier + return None + except RequestError as e: + raise NetBoxAPIError(f"Failed to fetch VNI for VLAN {vlan_id}: {e}") from e + + # ========================================================================= + # VRF Methods + # ========================================================================= + + def get_vrfs(self) -> list[dict[str, Any]]: + """ + Get all VRFs with route targets. + + Returns: + List of VRF dictionaries including import/export route targets + """ + logger.debug("Fetching all VRFs") + try: + vrfs = self._api.ipam.vrfs.all() + return [dict(v) for v in vrfs] + except RequestError as e: + raise NetBoxAPIError(f"Failed to fetch VRFs: {e}") from e + + def get_vrf(self, name: str) -> dict[str, Any] | None: + """ + Get a VRF by name. + + Args: + name: VRF name + + Returns: + VRF dictionary or None if not found + """ + logger.debug(f"Fetching VRF: {name}") + try: + vrf = self._api.ipam.vrfs.get(name=name) + if vrf: + return dict(vrf) + return None + except RequestError as e: + raise NetBoxAPIError(f"Failed to fetch VRF {name}: {e}") from e + + def get_device_vrf_interfaces( + self, device_name: str, vrf_name: str + ) -> list[dict[str, Any]]: + """ + Get interfaces assigned to a VRF on a device. + + Args: + device_name: Device name + vrf_name: VRF name + + Returns: + List of interface dictionaries assigned to the VRF + """ + logger.debug(f"Fetching VRF {vrf_name} interfaces for device: {device_name}") + try: + interfaces = self._api.dcim.interfaces.filter(device=device_name, vrf=vrf_name) + return [dict(i) for i in interfaces] + except RequestError as e: + raise NetBoxAPIError( + f"Failed to fetch VRF interfaces for {device_name}/{vrf_name}: {e}" + ) from e + + def get_route_targets(self) -> list[dict[str, Any]]: + """ + Get all route targets. + + Returns: + List of route target dictionaries + """ + logger.debug("Fetching all route targets") + try: + rts = self._api.ipam.route_targets.all() + return [dict(rt) for rt in rts] + except RequestError as e: + raise NetBoxAPIError(f"Failed to fetch route targets: {e}") from e + + # ========================================================================= + # MLAG Methods (Custom Fields) + # ========================================================================= + + def get_device_mlag_config(self, device_name: str) -> dict[str, Any]: + """ + Get MLAG configuration from device custom fields. + + Custom fields expected: + - mlag_domain_id: MLAG domain identifier + - mlag_peer_address: MLAG peer IP address + - mlag_local_address: MLAG local IP address + - mlag_virtual_mac: Shared virtual-router MAC + + Args: + device_name: Device name + + Returns: + MLAG configuration dictionary + """ + logger.debug(f"Fetching MLAG config for device: {device_name}") + device = self.get_device(device_name) + custom_fields = device.get("custom_fields", {}) or {} + + return { + "domain_id": custom_fields.get("mlag_domain_id"), + "peer_address": custom_fields.get("mlag_peer_address"), + "local_address": custom_fields.get("mlag_local_address"), + "virtual_mac": custom_fields.get("mlag_virtual_mac"), + } + + def get_mlag_peer_link(self, device_name: str) -> dict[str, Any] | None: + """ + Get MLAG peer-link interface for a device. + + Looks for interface with mlag_peer_link custom field set to True. + + Args: + device_name: Device name + + Returns: + Peer-link interface dictionary or None + """ + logger.debug(f"Fetching MLAG peer-link for device: {device_name}") + try: + interfaces = self._api.dcim.interfaces.filter(device=device_name) + for iface in interfaces: + cf = iface.custom_fields or {} + if cf.get("mlag_peer_link"): + return dict(iface) + return None + except RequestError as e: + raise NetBoxAPIError( + f"Failed to fetch MLAG peer-link for {device_name}: {e}" + ) from e + + # ========================================================================= + # Aggregated Intent Methods + # ========================================================================= + + def get_device_intent(self, device_name: str) -> dict[str, Any]: + """ + Get complete fabric intent for a single device. + + Aggregates all relevant configuration data for a device including: + - Device info and custom fields + - Interfaces (physical, loopback, LAG, SVI) + - IP addresses + - BGP sessions and peer groups + - VLAN assignments + - L2VPN/EVPN terminations + - VRF assignments + - MLAG configuration + + Args: + device_name: Device name + + Returns: + Complete device intent dictionary + """ + logger.info(f"Building complete intent for device: {device_name}") + + # Get device info + device = self.get_device(device_name) + + # Get interfaces + all_interfaces = self.get_device_interfaces(device_name) + loopbacks = self.get_device_loopbacks(device_name) + lags = self.get_device_lags(device_name) + svis = self.get_device_svis(device_name) + + # Get IP addresses + ip_addresses = self.get_device_ips(device_name) + + # Get ASN + asn = self.get_device_asn(device_name) + + # Get BGP configuration + bgp_sessions = self.get_bgp_sessions(device_name) + bgp_peer_groups = self.get_bgp_peer_groups(device_name) + + # Get L2VPN terminations for this device + l2vpn_terminations = self.get_l2vpn_terminations(device_name=device_name) + + # Get MLAG configuration + mlag_config = self.get_device_mlag_config(device_name) + mlag_peer_link = self.get_mlag_peer_link(device_name) + + # Build intent dictionary + intent = { + "device": { + "name": device.get("name"), + "role": device.get("role", {}).get("slug") if device.get("role") else None, + "platform": ( + device.get("platform", {}).get("slug") if device.get("platform") else None + ), + "site": device.get("site", {}).get("slug") if device.get("site") else None, + "custom_fields": device.get("custom_fields", {}), + }, + "asn": asn, + "interfaces": { + "all": all_interfaces, + "loopbacks": loopbacks, + "lags": lags, + "svis": svis, + }, + "ip_addresses": ip_addresses, + "bgp": { + "sessions": bgp_sessions, + "peer_groups": bgp_peer_groups, + }, + "l2vpn_terminations": l2vpn_terminations, + "mlag": { + "config": mlag_config, + "peer_link": mlag_peer_link, + }, + } + + logger.debug(f"Built intent for {device_name} with {len(all_interfaces)} interfaces") + return intent + + def get_fabric_intent(self) -> dict[str, Any]: + """ + Get complete fabric intent for all devices. + + Returns: + Dictionary with fabric-wide intent including: + - All devices with their intent + - Fabric-wide VLANs + - Fabric-wide L2VPNs + - Fabric-wide VRFs + """ + logger.info("Building complete fabric intent") + + # Get all devices + spine_devices = self.get_spine_devices() + leaf_devices = self.get_leaf_devices() + + # Get fabric-wide configuration + vlans = self.get_vlans() + l2vpns = self.get_l2vpns() + vrfs = self.get_vrfs() + + # Build device intents + device_intents = {} + for device in spine_devices + leaf_devices: + name = device.get("name") + if name: + device_intents[name] = self.get_device_intent(name) + + return { + "devices": device_intents, + "vlans": vlans, + "l2vpns": l2vpns, + "vrfs": vrfs, + }