feat(netbox): Add NetBox API client with v4.4 compatibility #22

Merged
Damien merged 5 commits from feat/netbox-data-model into main 2026-01-09 14:26:03 +00:00
3 changed files with 792 additions and 0 deletions
Showing only changes of commit 4799c4cbf2 - Show all commits

View File

@@ -7,6 +7,7 @@ requires-python = ">=3.12"
dependencies = [
"click>=8.1.0",
"pygnmi>=0.8.0",
"pynetbox>=7.0.0",
"rich>=13.0.0",
]

36
src/netbox/__init__.py Normal file
View File

@@ -0,0 +1,36 @@
"""
NetBox Client Module for Fabric Orchestrator.
This module provides a NetBox API client for fetching fabric intent data
using pynetbox.
Usage:
from src.netbox import FabricNetBoxClient
client = FabricNetBoxClient(
url="http://netbox.local",
token="your-token"
)
# Get all leaf devices
leaves = client.get_leaf_devices()
# Get complete intent for a device
intent = client.get_device_intent("leaf1")
"""
from .client import (
FabricNetBoxClient,
NetBoxAPIError,
NetBoxConnectionError,
NetBoxError,
NetBoxNotFoundError,
)
__all__ = [
"FabricNetBoxClient",
"NetBoxError",
"NetBoxConnectionError",
"NetBoxNotFoundError",
"NetBoxAPIError",
]

755
src/netbox/client.py Normal file
View File

@@ -0,0 +1,755 @@
"""
NetBox API Client for Fabric Orchestrator.
This module provides a client for fetching fabric intent data from NetBox
using pynetbox. It retrieves device, interface, BGP, EVPN, and VRF configuration
to determine the desired network state.
Usage:
from src.netbox import FabricNetBoxClient
client = FabricNetBoxClient(
url="http://netbox.local",
token="your-token"
)
# Get all leaf devices
leaves = client.get_leaf_devices()
# Get complete intent for a device
intent = client.get_device_intent("leaf1")
"""
from __future__ import annotations
import logging
import os
from typing import Any
import pynetbox
from pynetbox.core.query import RequestError
# =============================================================================
# Exceptions
# =============================================================================
class NetBoxError(Exception):
"""Base exception for NetBox operations."""
pass
class NetBoxConnectionError(NetBoxError):
"""Raised when connection to NetBox fails."""
pass
class NetBoxNotFoundError(NetBoxError):
"""Raised when requested resource is not found."""
pass
class NetBoxAPIError(NetBoxError):
"""Raised when NetBox API returns an error."""
pass
# =============================================================================
# Logger
# =============================================================================
logger = logging.getLogger(__name__)
# =============================================================================
# NetBox Client
# =============================================================================
class FabricNetBoxClient:
"""
NetBox API client for fabric intent retrieval.
Provides methods to fetch device, interface, BGP, EVPN, and VRF
configuration from NetBox to determine desired network state.
Supports configuration via environment variables:
- NETBOX_URL: NetBox server URL
- NETBOX_TOKEN: API token for authentication
Example:
client = FabricNetBoxClient(
url="http://netbox.local",
token="your-token"
)
leaves = client.get_leaf_devices()
"""
def __init__(
self,
url: str | None = None,
token: str | None = None,
threading: bool = False,
):
"""
Initialize NetBox client.
Args:
url: NetBox server URL (or NETBOX_URL env var)
token: API token (or NETBOX_TOKEN env var)
threading: Enable threading for requests (default: False)
Raises:
NetBoxConnectionError: If URL or token is not provided
"""
self.url = url or os.environ.get("NETBOX_URL")
self.token = token or os.environ.get("NETBOX_TOKEN")
if not self.url:
raise NetBoxConnectionError(
"NetBox URL not provided. Set NETBOX_URL environment variable or pass url parameter."
)
if not self.token:
raise NetBoxConnectionError(
"NetBox token not provided. Set NETBOX_TOKEN environment variable or pass token parameter."
)
logger.debug(f"Connecting to NetBox at {self.url}")
try:
self._api = pynetbox.api(self.url, token=self.token, threading=threading)
# Test connection by fetching API status
self._api.status()
logger.info(f"Successfully connected to NetBox at {self.url}")
except Exception as e:
raise NetBoxConnectionError(f"Failed to connect to NetBox at {self.url}: {e}") from e
# =========================================================================
# Device Methods
# =========================================================================
def get_device(self, name: str) -> dict[str, Any]:
"""
Get a device by name.
Args:
name: Device name
Returns:
Device data as dictionary
Raises:
NetBoxNotFoundError: If device not found
"""
logger.debug(f"Fetching device: {name}")
try:
device = self._api.dcim.devices.get(name=name)
if not device:
raise NetBoxNotFoundError(f"Device not found: {name}")
return dict(device)
except RequestError as e:
raise NetBoxAPIError(f"Failed to fetch device {name}: {e}") from e
def get_devices_by_role(self, role: str) -> list[dict[str, Any]]:
"""
Get all devices with a specific role.
Args:
role: Device role slug (e.g., 'leaf', 'spine')
Returns:
List of device dictionaries
"""
logger.debug(f"Fetching devices with role: {role}")
try:
devices = self._api.dcim.devices.filter(role=role)
return [dict(d) for d in devices]
except RequestError as e:
raise NetBoxAPIError(f"Failed to fetch devices with role {role}: {e}") from e
def get_leaf_devices(self) -> list[dict[str, Any]]:
"""
Get all leaf devices.
Returns:
List of leaf device dictionaries
"""
return self.get_devices_by_role("leaf")
def get_spine_devices(self) -> list[dict[str, Any]]:
"""
Get all spine devices.
Returns:
List of spine device dictionaries
"""
return self.get_devices_by_role("spine")
def get_device_asn(self, device_name: str) -> int | None:
"""
Get ASN assigned to a device.
Args:
device_name: Device name
Returns:
ASN number or None if not assigned
"""
logger.debug(f"Fetching ASN for device: {device_name}")
try:
asns = self._api.ipam.asns.filter(device=device_name)
asn_list = list(asns)
if asn_list:
return asn_list[0].asn
return None
except RequestError as e:
raise NetBoxAPIError(f"Failed to fetch ASN for {device_name}: {e}") from e
# =========================================================================
# Interface Methods
# =========================================================================
def get_device_interfaces(
self, device_name: str, interface_type: str | None = None
) -> list[dict[str, Any]]:
"""
Get interfaces for a device.
Args:
device_name: Device name
interface_type: Optional interface type filter (e.g., 'virtual', 'lag', '1000base-t')
Returns:
List of interface dictionaries
"""
logger.debug(f"Fetching interfaces for device: {device_name}, type: {interface_type}")
try:
filters = {"device": device_name}
if interface_type:
filters["type"] = interface_type
interfaces = self._api.dcim.interfaces.filter(**filters)
return [dict(i) for i in interfaces]
except RequestError as e:
raise NetBoxAPIError(f"Failed to fetch interfaces for {device_name}: {e}") from e
def get_device_loopbacks(self, device_name: str) -> list[dict[str, Any]]:
"""
Get loopback interfaces for a device.
Args:
device_name: Device name
Returns:
List of loopback interface dictionaries
"""
logger.debug(f"Fetching loopbacks for device: {device_name}")
try:
interfaces = self._api.dcim.interfaces.filter(device=device_name, type="virtual")
loopbacks = [dict(i) for i in interfaces if i.name.lower().startswith("loopback")]
return loopbacks
except RequestError as e:
raise NetBoxAPIError(f"Failed to fetch loopbacks for {device_name}: {e}") from e
def get_device_lags(self, device_name: str) -> list[dict[str, Any]]:
"""
Get LAG/Port-Channel interfaces for a device.
Args:
device_name: Device name
Returns:
List of LAG interface dictionaries
"""
logger.debug(f"Fetching LAGs for device: {device_name}")
return self.get_device_interfaces(device_name, interface_type="lag")
def get_device_svis(self, device_name: str) -> list[dict[str, Any]]:
"""
Get SVI (VLAN) interfaces for a device.
Args:
device_name: Device name
Returns:
List of SVI interface dictionaries
"""
logger.debug(f"Fetching SVIs for device: {device_name}")
try:
interfaces = self._api.dcim.interfaces.filter(device=device_name, type="virtual")
svis = [dict(i) for i in interfaces if i.name.lower().startswith("vlan")]
return svis
except RequestError as e:
raise NetBoxAPIError(f"Failed to fetch SVIs for {device_name}: {e}") from e
# =========================================================================
# IP Address Methods
# =========================================================================
def get_interface_ips(
self, device_name: str, interface_name: str
) -> list[dict[str, Any]]:
"""
Get IP addresses assigned to an interface.
Args:
device_name: Device name
interface_name: Interface name
Returns:
List of IP address dictionaries
"""
logger.debug(f"Fetching IPs for {device_name}:{interface_name}")
try:
ips = self._api.ipam.ip_addresses.filter(
device=device_name, interface=interface_name
)
return [dict(ip) for ip in ips]
except RequestError as e:
raise NetBoxAPIError(
f"Failed to fetch IPs for {device_name}:{interface_name}: {e}"
) from e
def get_device_ips(self, device_name: str) -> list[dict[str, Any]]:
"""
Get all IP addresses for a device.
Args:
device_name: Device name
Returns:
List of IP address dictionaries with interface info
"""
logger.debug(f"Fetching all IPs for device: {device_name}")
try:
ips = self._api.ipam.ip_addresses.filter(device=device_name)
return [dict(ip) for ip in ips]
except RequestError as e:
raise NetBoxAPIError(f"Failed to fetch IPs for {device_name}: {e}") from e
# =========================================================================
# BGP Methods (Plugin)
# =========================================================================
def get_bgp_sessions(self, device_name: str | None = None) -> list[dict[str, Any]]:
"""
Get BGP sessions from NetBox BGP plugin.
Args:
device_name: Optional device name filter
Returns:
List of BGP session dictionaries
"""
logger.debug(f"Fetching BGP sessions for device: {device_name or 'all'}")
try:
if device_name:
sessions = self._api.plugins.bgp.sessions.filter(device=device_name)
else:
sessions = self._api.plugins.bgp.sessions.all()
return [dict(s) for s in sessions]
except RequestError as e:
raise NetBoxAPIError(f"Failed to fetch BGP sessions: {e}") from e
except AttributeError:
logger.warning("NetBox BGP plugin not available")
return []
def get_bgp_peer_groups(self, device_name: str | None = None) -> list[dict[str, Any]]:
"""
Get BGP peer groups from NetBox BGP plugin.
Args:
device_name: Optional device name filter
Returns:
List of BGP peer group dictionaries
"""
logger.debug(f"Fetching BGP peer groups for device: {device_name or 'all'}")
try:
if device_name:
peer_groups = self._api.plugins.bgp.peer_groups.filter(device=device_name)
else:
peer_groups = self._api.plugins.bgp.peer_groups.all()
return [dict(pg) for pg in peer_groups]
except RequestError as e:
raise NetBoxAPIError(f"Failed to fetch BGP peer groups: {e}") from e
except AttributeError:
logger.warning("NetBox BGP plugin not available")
return []
# =========================================================================
# VLAN Methods
# =========================================================================
def get_vlans(self, site: str | None = None) -> list[dict[str, Any]]:
"""
Get VLANs.
Args:
site: Optional site filter
Returns:
List of VLAN dictionaries
"""
logger.debug(f"Fetching VLANs for site: {site or 'all'}")
try:
if site:
vlans = self._api.ipam.vlans.filter(site=site)
else:
vlans = self._api.ipam.vlans.all()
return [dict(v) for v in vlans]
except RequestError as e:
raise NetBoxAPIError(f"Failed to fetch VLANs: {e}") from e
def get_vlan(self, vlan_id: int, site: str | None = None) -> dict[str, Any] | None:
"""
Get a specific VLAN by ID.
Args:
vlan_id: VLAN ID
site: Optional site filter
Returns:
VLAN dictionary or None if not found
"""
logger.debug(f"Fetching VLAN {vlan_id}")
try:
filters = {"vid": vlan_id}
if site:
filters["site"] = site
vlans = self._api.ipam.vlans.filter(**filters)
vlan_list = list(vlans)
if vlan_list:
return dict(vlan_list[0])
return None
except RequestError as e:
raise NetBoxAPIError(f"Failed to fetch VLAN {vlan_id}: {e}") from e
# =========================================================================
# L2VPN/EVPN Methods
# =========================================================================
def get_l2vpns(self, l2vpn_type: str = "evpn") -> list[dict[str, Any]]:
"""
Get L2VPNs (EVPN instances).
Args:
l2vpn_type: L2VPN type filter (default: 'evpn')
Returns:
List of L2VPN dictionaries with VNI identifiers
"""
logger.debug(f"Fetching L2VPNs of type: {l2vpn_type}")
try:
l2vpns = self._api.vpn.l2vpns.filter(type=l2vpn_type)
return [dict(l2vpn) for l2vpn in l2vpns]
except RequestError as e:
raise NetBoxAPIError(f"Failed to fetch L2VPNs: {e}") from e
def get_l2vpn_terminations(
self, device_name: str | None = None, l2vpn_name: str | None = None
) -> list[dict[str, Any]]:
"""
Get L2VPN terminations (VLAN-to-device mappings).
Args:
device_name: Optional device name filter
l2vpn_name: Optional L2VPN name filter
Returns:
List of L2VPN termination dictionaries
"""
logger.debug(
f"Fetching L2VPN terminations for device: {device_name or 'all'}, "
f"l2vpn: {l2vpn_name or 'all'}"
)
try:
filters = {}
if l2vpn_name:
filters["l2vpn"] = l2vpn_name
if device_name:
filters["device"] = device_name
if filters:
terminations = self._api.vpn.l2vpn_terminations.filter(**filters)
else:
terminations = self._api.vpn.l2vpn_terminations.all()
return [dict(t) for t in terminations]
except RequestError as e:
raise NetBoxAPIError(f"Failed to fetch L2VPN terminations: {e}") from e
def get_vni_for_vlan(self, vlan_id: int) -> int | None:
"""
Get VNI for a VLAN from L2VPN configuration.
Args:
vlan_id: VLAN ID
Returns:
VNI number or None if not mapped
"""
logger.debug(f"Fetching VNI for VLAN {vlan_id}")
try:
# Get L2VPN terminations for this VLAN
terminations = self._api.vpn.l2vpn_terminations.filter(vlan_id=vlan_id)
term_list = list(terminations)
if term_list:
l2vpn = term_list[0].l2vpn
if l2vpn and hasattr(l2vpn, "identifier"):
return l2vpn.identifier
return None
except RequestError as e:
raise NetBoxAPIError(f"Failed to fetch VNI for VLAN {vlan_id}: {e}") from e
# =========================================================================
# VRF Methods
# =========================================================================
def get_vrfs(self) -> list[dict[str, Any]]:
"""
Get all VRFs with route targets.
Returns:
List of VRF dictionaries including import/export route targets
"""
logger.debug("Fetching all VRFs")
try:
vrfs = self._api.ipam.vrfs.all()
return [dict(v) for v in vrfs]
except RequestError as e:
raise NetBoxAPIError(f"Failed to fetch VRFs: {e}") from e
def get_vrf(self, name: str) -> dict[str, Any] | None:
"""
Get a VRF by name.
Args:
name: VRF name
Returns:
VRF dictionary or None if not found
"""
logger.debug(f"Fetching VRF: {name}")
try:
vrf = self._api.ipam.vrfs.get(name=name)
if vrf:
return dict(vrf)
return None
except RequestError as e:
raise NetBoxAPIError(f"Failed to fetch VRF {name}: {e}") from e
def get_device_vrf_interfaces(
self, device_name: str, vrf_name: str
) -> list[dict[str, Any]]:
"""
Get interfaces assigned to a VRF on a device.
Args:
device_name: Device name
vrf_name: VRF name
Returns:
List of interface dictionaries assigned to the VRF
"""
logger.debug(f"Fetching VRF {vrf_name} interfaces for device: {device_name}")
try:
interfaces = self._api.dcim.interfaces.filter(device=device_name, vrf=vrf_name)
return [dict(i) for i in interfaces]
except RequestError as e:
raise NetBoxAPIError(
f"Failed to fetch VRF interfaces for {device_name}/{vrf_name}: {e}"
) from e
def get_route_targets(self) -> list[dict[str, Any]]:
"""
Get all route targets.
Returns:
List of route target dictionaries
"""
logger.debug("Fetching all route targets")
try:
rts = self._api.ipam.route_targets.all()
return [dict(rt) for rt in rts]
except RequestError as e:
raise NetBoxAPIError(f"Failed to fetch route targets: {e}") from e
# =========================================================================
# MLAG Methods (Custom Fields)
# =========================================================================
def get_device_mlag_config(self, device_name: str) -> dict[str, Any]:
"""
Get MLAG configuration from device custom fields.
Custom fields expected:
- mlag_domain_id: MLAG domain identifier
- mlag_peer_address: MLAG peer IP address
- mlag_local_address: MLAG local IP address
- mlag_virtual_mac: Shared virtual-router MAC
Args:
device_name: Device name
Returns:
MLAG configuration dictionary
"""
logger.debug(f"Fetching MLAG config for device: {device_name}")
device = self.get_device(device_name)
custom_fields = device.get("custom_fields", {}) or {}
return {
"domain_id": custom_fields.get("mlag_domain_id"),
"peer_address": custom_fields.get("mlag_peer_address"),
"local_address": custom_fields.get("mlag_local_address"),
"virtual_mac": custom_fields.get("mlag_virtual_mac"),
}
def get_mlag_peer_link(self, device_name: str) -> dict[str, Any] | None:
"""
Get MLAG peer-link interface for a device.
Looks for interface with mlag_peer_link custom field set to True.
Args:
device_name: Device name
Returns:
Peer-link interface dictionary or None
"""
logger.debug(f"Fetching MLAG peer-link for device: {device_name}")
try:
interfaces = self._api.dcim.interfaces.filter(device=device_name)
for iface in interfaces:
cf = iface.custom_fields or {}
if cf.get("mlag_peer_link"):
return dict(iface)
return None
except RequestError as e:
raise NetBoxAPIError(
f"Failed to fetch MLAG peer-link for {device_name}: {e}"
) from e
# =========================================================================
# Aggregated Intent Methods
# =========================================================================
def get_device_intent(self, device_name: str) -> dict[str, Any]:
"""
Get complete fabric intent for a single device.
Aggregates all relevant configuration data for a device including:
- Device info and custom fields
- Interfaces (physical, loopback, LAG, SVI)
- IP addresses
- BGP sessions and peer groups
- VLAN assignments
- L2VPN/EVPN terminations
- VRF assignments
- MLAG configuration
Args:
device_name: Device name
Returns:
Complete device intent dictionary
"""
logger.info(f"Building complete intent for device: {device_name}")
# Get device info
device = self.get_device(device_name)
# Get interfaces
all_interfaces = self.get_device_interfaces(device_name)
loopbacks = self.get_device_loopbacks(device_name)
lags = self.get_device_lags(device_name)
svis = self.get_device_svis(device_name)
# Get IP addresses
ip_addresses = self.get_device_ips(device_name)
# Get ASN
asn = self.get_device_asn(device_name)
# Get BGP configuration
bgp_sessions = self.get_bgp_sessions(device_name)
bgp_peer_groups = self.get_bgp_peer_groups(device_name)
# Get L2VPN terminations for this device
l2vpn_terminations = self.get_l2vpn_terminations(device_name=device_name)
# Get MLAG configuration
mlag_config = self.get_device_mlag_config(device_name)
mlag_peer_link = self.get_mlag_peer_link(device_name)
# Build intent dictionary
intent = {
"device": {
"name": device.get("name"),
"role": device.get("role", {}).get("slug") if device.get("role") else None,
"platform": (
device.get("platform", {}).get("slug") if device.get("platform") else None
),
"site": device.get("site", {}).get("slug") if device.get("site") else None,
"custom_fields": device.get("custom_fields", {}),
},
"asn": asn,
"interfaces": {
"all": all_interfaces,
"loopbacks": loopbacks,
"lags": lags,
"svis": svis,
},
"ip_addresses": ip_addresses,
"bgp": {
"sessions": bgp_sessions,
"peer_groups": bgp_peer_groups,
},
"l2vpn_terminations": l2vpn_terminations,
"mlag": {
"config": mlag_config,
"peer_link": mlag_peer_link,
},
}
logger.debug(f"Built intent for {device_name} with {len(all_interfaces)} interfaces")
return intent
def get_fabric_intent(self) -> dict[str, Any]:
"""
Get complete fabric intent for all devices.
Returns:
Dictionary with fabric-wide intent including:
- All devices with their intent
- Fabric-wide VLANs
- Fabric-wide L2VPNs
- Fabric-wide VRFs
"""
logger.info("Building complete fabric intent")
# Get all devices
spine_devices = self.get_spine_devices()
leaf_devices = self.get_leaf_devices()
# Get fabric-wide configuration
vlans = self.get_vlans()
l2vpns = self.get_l2vpns()
vrfs = self.get_vrfs()
# Build device intents
device_intents = {}
for device in spine_devices + leaf_devices:
name = device.get("name")
if name:
device_intents[name] = self.get_device_intent(name)
return {
"devices": device_intents,
"vlans": vlans,
"l2vpns": l2vpns,
"vrfs": vrfs,
}