Remove fabric provisioning script
The `scripts/provision_fabric.py` script and its associated NetBox client module (`src/netbox/`) have been removed. This functionality is no longer needed.
This commit is contained in:
@@ -1,84 +0,0 @@
|
|||||||
# NetBox Provisioning for EVPN-VXLAN Fabric
|
|
||||||
|
|
||||||
This directory contains scripts to populate NetBox with the fabric topology defined in [arista-evpn-vxlan-clab](https://gitea.arnodo.fr/Damien/arista-evpn-vxlan-clab).
|
|
||||||
|
|
||||||
## Prerequisites
|
|
||||||
|
|
||||||
- NetBox 4.4.x running and accessible
|
|
||||||
- NetBox BGP Plugin v0.17.x installed (optional, for BGP sessions)
|
|
||||||
- Python 3.9+
|
|
||||||
- API token with write permissions
|
|
||||||
|
|
||||||
## Installation
|
|
||||||
|
|
||||||
```bash
|
|
||||||
uv add pynetbox
|
|
||||||
```
|
|
||||||
|
|
||||||
## Usage
|
|
||||||
|
|
||||||
```bash
|
|
||||||
export NETBOX_URL="http://netbox.example.com"
|
|
||||||
export NETBOX_TOKEN="your-api-token"
|
|
||||||
uv run python scripts/provision_fabric.py
|
|
||||||
```
|
|
||||||
|
|
||||||
## What Gets Created
|
|
||||||
|
|
||||||
### Custom Fields
|
|
||||||
|
|
||||||
| Object Type | Field | Description |
|
|
||||||
| ----------- | -------------------- | -------------------------- |
|
|
||||||
| Device | `asn` | BGP ASN |
|
|
||||||
| Device | `mlag_domain_id` | MLAG domain identifier |
|
|
||||||
| Device | `mlag_peer_address` | MLAG peer IP |
|
|
||||||
| Device | `mlag_local_address` | MLAG local IP |
|
|
||||||
| Device | `mlag_virtual_mac` | Shared virtual MAC |
|
|
||||||
| Interface | `mlag_peer_link` | Marks peer-link interfaces |
|
|
||||||
| Interface | `mlag_id` | MLAG ID for host LAGs |
|
|
||||||
| VRF | `l3vni` | L3 VNI for EVPN |
|
|
||||||
| VRF | `vrf_vlan` | VLAN for L3 VNI SVI |
|
|
||||||
| IP Address | `virtual_ip` | Anycast/virtual IP flag |
|
|
||||||
|
|
||||||
### Organization
|
|
||||||
|
|
||||||
- **Site**: evpn-lab
|
|
||||||
- **Manufacturer**: Arista
|
|
||||||
- **Device Types**: cEOS-lab, Linux Server
|
|
||||||
- **Device Roles**: Spine, Leaf, Server
|
|
||||||
|
|
||||||
### Devices
|
|
||||||
|
|
||||||
| Device | Role | ASN | MLAG Domain |
|
|
||||||
| -------------- | ------ | ----- | ----------- |
|
|
||||||
| spine1, spine2 | Spine | 65000 | - |
|
|
||||||
| leaf1, leaf2 | Leaf | 65001 | MLAG1 |
|
|
||||||
| leaf3, leaf4 | Leaf | 65002 | MLAG2 |
|
|
||||||
| leaf5, leaf6 | Leaf | 65003 | MLAG3 |
|
|
||||||
| leaf7, leaf8 | Leaf | 65004 | MLAG4 |
|
|
||||||
| host1-4 | Server | - | - |
|
|
||||||
|
|
||||||
### Cabling
|
|
||||||
|
|
||||||
- Spine1/2 Ethernet1-8 → Leaf1-8 Ethernet11/12
|
|
||||||
- MLAG peer-links: Leaf pairs via Ethernet10
|
|
||||||
- Host dual-homing: eth1/eth2 to MLAG pairs
|
|
||||||
|
|
||||||
### IP Addressing
|
|
||||||
|
|
||||||
| Purpose | Prefix |
|
|
||||||
| --------------------- | ------------- |
|
|
||||||
| Spine1-Leaf P2P | 10.0.1.0/24 |
|
|
||||||
| Spine2-Leaf P2P | 10.0.2.0/24 |
|
|
||||||
| MLAG iBGP P2P | 10.0.3.0/24 |
|
|
||||||
| MLAG Peer VLAN | 10.0.199.0/24 |
|
|
||||||
| Loopback0 (Router-ID) | 10.0.250.0/24 |
|
|
||||||
| Loopback1 (VTEP) | 10.0.255.0/24 |
|
|
||||||
|
|
||||||
## Idempotency
|
|
||||||
|
|
||||||
The script is idempotent - running it multiple times will not create duplicate objects.
|
|
||||||
|
|
||||||
## Reference
|
|
||||||
|
|
||||||
- [NetBox Data Model Documentation](../docs/netbox-data-model.md)
|
|
||||||
File diff suppressed because it is too large
Load Diff
@@ -1,36 +0,0 @@
|
|||||||
"""
|
|
||||||
NetBox Client Module for Fabric Orchestrator.
|
|
||||||
|
|
||||||
This module provides a NetBox API client for fetching fabric intent data
|
|
||||||
using pynetbox.
|
|
||||||
|
|
||||||
Usage:
|
|
||||||
from src.netbox import FabricNetBoxClient
|
|
||||||
|
|
||||||
client = FabricNetBoxClient(
|
|
||||||
url="http://netbox.local",
|
|
||||||
token="your-token"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Get all leaf devices
|
|
||||||
leaves = client.get_leaf_devices()
|
|
||||||
|
|
||||||
# Get complete intent for a device
|
|
||||||
intent = client.get_device_intent("leaf1")
|
|
||||||
"""
|
|
||||||
|
|
||||||
from .client import (
|
|
||||||
FabricNetBoxClient,
|
|
||||||
NetBoxAPIError,
|
|
||||||
NetBoxConnectionError,
|
|
||||||
NetBoxError,
|
|
||||||
NetBoxNotFoundError,
|
|
||||||
)
|
|
||||||
|
|
||||||
__all__ = [
|
|
||||||
"FabricNetBoxClient",
|
|
||||||
"NetBoxError",
|
|
||||||
"NetBoxConnectionError",
|
|
||||||
"NetBoxNotFoundError",
|
|
||||||
"NetBoxAPIError",
|
|
||||||
]
|
|
||||||
@@ -1,855 +0,0 @@
|
|||||||
"""
|
|
||||||
NetBox API Client for Fabric Orchestrator.
|
|
||||||
|
|
||||||
This module provides a client for fetching fabric intent data from NetBox
|
|
||||||
using pynetbox. It retrieves device, interface, BGP, EVPN, and VRF configuration
|
|
||||||
to determine the desired network state.
|
|
||||||
|
|
||||||
Usage:
|
|
||||||
from src.netbox import FabricNetBoxClient
|
|
||||||
|
|
||||||
client = FabricNetBoxClient(
|
|
||||||
url="http://netbox.local",
|
|
||||||
token="your-token"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Get all leaf devices
|
|
||||||
leaves = client.get_leaf_devices()
|
|
||||||
|
|
||||||
# Get complete intent for a device
|
|
||||||
intent = client.get_device_intent("leaf1")
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
import pynetbox
|
|
||||||
from pynetbox.core.query import RequestError
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# Exceptions
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
|
|
||||||
class NetBoxError(Exception):
|
|
||||||
"""Base exception for NetBox operations."""
|
|
||||||
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class NetBoxConnectionError(NetBoxError):
|
|
||||||
"""Raised when connection to NetBox fails."""
|
|
||||||
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class NetBoxNotFoundError(NetBoxError):
|
|
||||||
"""Raised when requested resource is not found."""
|
|
||||||
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class NetBoxAPIError(NetBoxError):
|
|
||||||
"""Raised when NetBox API returns an error."""
|
|
||||||
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# Logger
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
|
||||||
# NetBox Client
|
|
||||||
# =============================================================================
|
|
||||||
|
|
||||||
|
|
||||||
class FabricNetBoxClient:
|
|
||||||
"""
|
|
||||||
NetBox API client for fabric intent retrieval.
|
|
||||||
|
|
||||||
Provides methods to fetch device, interface, BGP, EVPN, and VRF
|
|
||||||
configuration from NetBox to determine desired network state.
|
|
||||||
|
|
||||||
Supports configuration via environment variables:
|
|
||||||
- NETBOX_URL: NetBox server URL
|
|
||||||
- NETBOX_TOKEN: API token for authentication
|
|
||||||
|
|
||||||
Example:
|
|
||||||
client = FabricNetBoxClient(
|
|
||||||
url="http://netbox.local",
|
|
||||||
token="your-token"
|
|
||||||
)
|
|
||||||
leaves = client.get_leaf_devices()
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
url: str | None = None,
|
|
||||||
token: str | None = None,
|
|
||||||
threading: bool = False,
|
|
||||||
):
|
|
||||||
"""
|
|
||||||
Initialize NetBox client.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
url: NetBox server URL (or NETBOX_URL env var)
|
|
||||||
token: API token (or NETBOX_TOKEN env var)
|
|
||||||
threading: Enable threading for requests (default: False)
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
NetBoxConnectionError: If URL or token is not provided
|
|
||||||
"""
|
|
||||||
self.url = url or os.environ.get("NETBOX_URL")
|
|
||||||
self.token = token or os.environ.get("NETBOX_TOKEN")
|
|
||||||
|
|
||||||
if not self.url:
|
|
||||||
raise NetBoxConnectionError(
|
|
||||||
"NetBox URL not provided. Set NETBOX_URL environment variable or pass url parameter."
|
|
||||||
)
|
|
||||||
if not self.token:
|
|
||||||
raise NetBoxConnectionError(
|
|
||||||
"NetBox token not provided. Set NETBOX_TOKEN environment variable or pass token parameter."
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.debug(f"Connecting to NetBox at {self.url}")
|
|
||||||
|
|
||||||
try:
|
|
||||||
self._api = pynetbox.api(self.url, token=self.token, threading=threading)
|
|
||||||
# Test connection by fetching API status
|
|
||||||
self._api.status()
|
|
||||||
logger.info(f"Successfully connected to NetBox at {self.url}")
|
|
||||||
except Exception as e:
|
|
||||||
raise NetBoxConnectionError(f"Failed to connect to NetBox at {self.url}: {e}") from e
|
|
||||||
|
|
||||||
# =========================================================================
|
|
||||||
# Device Methods
|
|
||||||
# =========================================================================
|
|
||||||
|
|
||||||
def get_device(self, name: str) -> dict[str, Any]:
|
|
||||||
"""
|
|
||||||
Get a device by name.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
name: Device name
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Device data as dictionary
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
NetBoxNotFoundError: If device not found
|
|
||||||
"""
|
|
||||||
logger.debug(f"Fetching device: {name}")
|
|
||||||
try:
|
|
||||||
device = self._api.dcim.devices.get(name=name)
|
|
||||||
if not device:
|
|
||||||
raise NetBoxNotFoundError(f"Device not found: {name}")
|
|
||||||
return dict(device)
|
|
||||||
except RequestError as e:
|
|
||||||
raise NetBoxAPIError(f"Failed to fetch device {name}: {e}") from e
|
|
||||||
|
|
||||||
def get_devices_by_role(self, role: str) -> list[dict[str, Any]]:
|
|
||||||
"""
|
|
||||||
Get all devices with a specific role.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
role: Device role slug (e.g., 'leaf', 'spine')
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of device dictionaries
|
|
||||||
"""
|
|
||||||
logger.debug(f"Fetching devices with role: {role}")
|
|
||||||
try:
|
|
||||||
devices = self._api.dcim.devices.filter(role=role)
|
|
||||||
return [dict(d) for d in devices]
|
|
||||||
except RequestError as e:
|
|
||||||
raise NetBoxAPIError(f"Failed to fetch devices with role {role}: {e}") from e
|
|
||||||
|
|
||||||
def get_leaf_devices(self) -> list[dict[str, Any]]:
|
|
||||||
"""
|
|
||||||
Get all leaf devices.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of leaf device dictionaries
|
|
||||||
"""
|
|
||||||
return self.get_devices_by_role("leaf")
|
|
||||||
|
|
||||||
def get_spine_devices(self) -> list[dict[str, Any]]:
|
|
||||||
"""
|
|
||||||
Get all spine devices.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of spine device dictionaries
|
|
||||||
"""
|
|
||||||
return self.get_devices_by_role("spine")
|
|
||||||
|
|
||||||
def get_device_asn(self, device_name: str) -> int | None:
|
|
||||||
"""
|
|
||||||
Get ASN assigned to a device from custom field.
|
|
||||||
|
|
||||||
Note: NetBox 4.4 does not support filtering ASNs by device.
|
|
||||||
ASN is stored as a custom field on the Device object.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
device_name: Device name
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
ASN number or None if not assigned
|
|
||||||
"""
|
|
||||||
logger.debug(f"Fetching ASN for device: {device_name}")
|
|
||||||
device = self.get_device(device_name)
|
|
||||||
return device.get("custom_fields", {}).get("asn")
|
|
||||||
|
|
||||||
# =========================================================================
|
|
||||||
# Interface Methods
|
|
||||||
# =========================================================================
|
|
||||||
|
|
||||||
def get_device_interfaces(
|
|
||||||
self, device_name: str, interface_type: str | None = None
|
|
||||||
) -> list[dict[str, Any]]:
|
|
||||||
"""
|
|
||||||
Get interfaces for a device.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
device_name: Device name
|
|
||||||
interface_type: Optional interface type filter (e.g., 'virtual', 'lag', '1000base-t')
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of interface dictionaries
|
|
||||||
"""
|
|
||||||
logger.debug(f"Fetching interfaces for device: {device_name}, type: {interface_type}")
|
|
||||||
try:
|
|
||||||
filters = {"device": device_name}
|
|
||||||
if interface_type:
|
|
||||||
filters["type"] = interface_type
|
|
||||||
interfaces = self._api.dcim.interfaces.filter(**filters)
|
|
||||||
return [dict(i) for i in interfaces]
|
|
||||||
except RequestError as e:
|
|
||||||
raise NetBoxAPIError(f"Failed to fetch interfaces for {device_name}: {e}") from e
|
|
||||||
|
|
||||||
def get_device_loopbacks(self, device_name: str) -> list[dict[str, Any]]:
|
|
||||||
"""
|
|
||||||
Get loopback interfaces for a device.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
device_name: Device name
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of loopback interface dictionaries
|
|
||||||
"""
|
|
||||||
logger.debug(f"Fetching loopbacks for device: {device_name}")
|
|
||||||
try:
|
|
||||||
interfaces = self._api.dcim.interfaces.filter(device=device_name, type="virtual")
|
|
||||||
loopbacks = [dict(i) for i in interfaces if i.name.lower().startswith("loopback")]
|
|
||||||
return loopbacks
|
|
||||||
except RequestError as e:
|
|
||||||
raise NetBoxAPIError(f"Failed to fetch loopbacks for {device_name}: {e}") from e
|
|
||||||
|
|
||||||
def get_device_lags(self, device_name: str) -> list[dict[str, Any]]:
|
|
||||||
"""
|
|
||||||
Get LAG/Port-Channel interfaces for a device.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
device_name: Device name
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of LAG interface dictionaries
|
|
||||||
"""
|
|
||||||
logger.debug(f"Fetching LAGs for device: {device_name}")
|
|
||||||
return self.get_device_interfaces(device_name, interface_type="lag")
|
|
||||||
|
|
||||||
def get_device_svis(self, device_name: str) -> list[dict[str, Any]]:
|
|
||||||
"""
|
|
||||||
Get SVI (VLAN) interfaces for a device.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
device_name: Device name
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of SVI interface dictionaries
|
|
||||||
"""
|
|
||||||
logger.debug(f"Fetching SVIs for device: {device_name}")
|
|
||||||
try:
|
|
||||||
interfaces = self._api.dcim.interfaces.filter(device=device_name, type="virtual")
|
|
||||||
svis = [dict(i) for i in interfaces if i.name.lower().startswith("vlan")]
|
|
||||||
return svis
|
|
||||||
except RequestError as e:
|
|
||||||
raise NetBoxAPIError(f"Failed to fetch SVIs for {device_name}: {e}") from e
|
|
||||||
|
|
||||||
# =========================================================================
|
|
||||||
# IP Address Methods
|
|
||||||
# =========================================================================
|
|
||||||
|
|
||||||
def get_interface_ips(
|
|
||||||
self, device_name: str, interface_name: str
|
|
||||||
) -> list[dict[str, Any]]:
|
|
||||||
"""
|
|
||||||
Get IP addresses assigned to an interface.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
device_name: Device name
|
|
||||||
interface_name: Interface name
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of IP address dictionaries
|
|
||||||
"""
|
|
||||||
logger.debug(f"Fetching IPs for {device_name}:{interface_name}")
|
|
||||||
try:
|
|
||||||
ips = self._api.ipam.ip_addresses.filter(
|
|
||||||
device=device_name, interface=interface_name
|
|
||||||
)
|
|
||||||
return [dict(ip) for ip in ips]
|
|
||||||
except RequestError as e:
|
|
||||||
raise NetBoxAPIError(
|
|
||||||
f"Failed to fetch IPs for {device_name}:{interface_name}: {e}"
|
|
||||||
) from e
|
|
||||||
|
|
||||||
def get_device_ips(self, device_name: str) -> list[dict[str, Any]]:
|
|
||||||
"""
|
|
||||||
Get all IP addresses for a device.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
device_name: Device name
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of IP address dictionaries with interface info
|
|
||||||
"""
|
|
||||||
logger.debug(f"Fetching all IPs for device: {device_name}")
|
|
||||||
try:
|
|
||||||
ips = self._api.ipam.ip_addresses.filter(device=device_name)
|
|
||||||
return [dict(ip) for ip in ips]
|
|
||||||
except RequestError as e:
|
|
||||||
raise NetBoxAPIError(f"Failed to fetch IPs for {device_name}: {e}") from e
|
|
||||||
|
|
||||||
# =========================================================================
|
|
||||||
# BGP Methods (Plugin)
|
|
||||||
# =========================================================================
|
|
||||||
|
|
||||||
def get_bgp_sessions(self, device_name: str | None = None) -> list[dict[str, Any]]:
|
|
||||||
"""
|
|
||||||
Get BGP sessions from NetBox BGP plugin.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
device_name: Optional device name filter
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of BGP session dictionaries
|
|
||||||
"""
|
|
||||||
logger.debug(f"Fetching BGP sessions for device: {device_name or 'all'}")
|
|
||||||
try:
|
|
||||||
if device_name:
|
|
||||||
sessions = self._api.plugins.bgp.sessions.filter(device=device_name)
|
|
||||||
else:
|
|
||||||
sessions = self._api.plugins.bgp.sessions.all()
|
|
||||||
return [dict(s) for s in sessions]
|
|
||||||
except RequestError as e:
|
|
||||||
raise NetBoxAPIError(f"Failed to fetch BGP sessions: {e}") from e
|
|
||||||
except AttributeError:
|
|
||||||
logger.warning("NetBox BGP plugin not available")
|
|
||||||
return []
|
|
||||||
|
|
||||||
def get_bgp_peer_groups(self, device_name: str | None = None) -> list[dict[str, Any]]:
|
|
||||||
"""
|
|
||||||
Get BGP peer groups from NetBox BGP plugin.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
device_name: Optional device name filter
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of BGP peer group dictionaries
|
|
||||||
"""
|
|
||||||
logger.debug(f"Fetching BGP peer groups for device: {device_name or 'all'}")
|
|
||||||
try:
|
|
||||||
if device_name:
|
|
||||||
peer_groups = self._api.plugins.bgp.peer_groups.filter(device=device_name)
|
|
||||||
else:
|
|
||||||
peer_groups = self._api.plugins.bgp.peer_groups.all()
|
|
||||||
return [dict(pg) for pg in peer_groups]
|
|
||||||
except RequestError as e:
|
|
||||||
raise NetBoxAPIError(f"Failed to fetch BGP peer groups: {e}") from e
|
|
||||||
except AttributeError:
|
|
||||||
logger.warning("NetBox BGP plugin not available")
|
|
||||||
return []
|
|
||||||
|
|
||||||
# =========================================================================
|
|
||||||
# VLAN Methods
|
|
||||||
# =========================================================================
|
|
||||||
|
|
||||||
def get_vlans(self, site: str | None = None) -> list[dict[str, Any]]:
|
|
||||||
"""
|
|
||||||
Get VLANs.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
site: Optional site filter
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of VLAN dictionaries
|
|
||||||
"""
|
|
||||||
logger.debug(f"Fetching VLANs for site: {site or 'all'}")
|
|
||||||
try:
|
|
||||||
if site:
|
|
||||||
vlans = self._api.ipam.vlans.filter(site=site)
|
|
||||||
else:
|
|
||||||
vlans = self._api.ipam.vlans.all()
|
|
||||||
return [dict(v) for v in vlans]
|
|
||||||
except RequestError as e:
|
|
||||||
raise NetBoxAPIError(f"Failed to fetch VLANs: {e}") from e
|
|
||||||
|
|
||||||
def get_vlan(self, vlan_id: int, site: str | None = None) -> dict[str, Any] | None:
|
|
||||||
"""
|
|
||||||
Get a specific VLAN by ID.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
vlan_id: VLAN ID
|
|
||||||
site: Optional site filter
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
VLAN dictionary or None if not found
|
|
||||||
"""
|
|
||||||
logger.debug(f"Fetching VLAN {vlan_id}")
|
|
||||||
try:
|
|
||||||
filters = {"vid": vlan_id}
|
|
||||||
if site:
|
|
||||||
filters["site"] = site
|
|
||||||
vlans = self._api.ipam.vlans.filter(**filters)
|
|
||||||
vlan_list = list(vlans)
|
|
||||||
if vlan_list:
|
|
||||||
return dict(vlan_list[0])
|
|
||||||
return None
|
|
||||||
except RequestError as e:
|
|
||||||
raise NetBoxAPIError(f"Failed to fetch VLAN {vlan_id}: {e}") from e
|
|
||||||
|
|
||||||
# =========================================================================
|
|
||||||
# L2VPN/EVPN Methods
|
|
||||||
# =========================================================================
|
|
||||||
|
|
||||||
def get_l2vpns(self, l2vpn_type: str = "evpn") -> list[dict[str, Any]]:
|
|
||||||
"""
|
|
||||||
Get L2VPNs (EVPN instances).
|
|
||||||
|
|
||||||
Args:
|
|
||||||
l2vpn_type: L2VPN type filter (default: 'evpn')
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of L2VPN dictionaries with VNI identifiers
|
|
||||||
"""
|
|
||||||
logger.debug(f"Fetching L2VPNs of type: {l2vpn_type}")
|
|
||||||
try:
|
|
||||||
l2vpns = self._api.vpn.l2vpns.filter(type=l2vpn_type)
|
|
||||||
return [dict(l2vpn) for l2vpn in l2vpns]
|
|
||||||
except RequestError as e:
|
|
||||||
raise NetBoxAPIError(f"Failed to fetch L2VPNs: {e}") from e
|
|
||||||
|
|
||||||
def get_l2vpn_terminations(
|
|
||||||
self, device_name: str | None = None, l2vpn_name: str | None = None
|
|
||||||
) -> list[dict[str, Any]]:
|
|
||||||
"""
|
|
||||||
Get L2VPN terminations (VLAN-to-device mappings).
|
|
||||||
|
|
||||||
Note: NetBox 4.4 does not support filtering L2VPN terminations by device.
|
|
||||||
L2VPN terminations link to VLANs, not devices directly.
|
|
||||||
The device_name parameter is accepted for API compatibility but ignored.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
device_name: Ignored - not supported by NetBox API
|
|
||||||
l2vpn_name: Optional L2VPN name filter
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of L2VPN termination dictionaries
|
|
||||||
"""
|
|
||||||
if device_name:
|
|
||||||
logger.warning(
|
|
||||||
f"device_name filter '{device_name}' ignored - "
|
|
||||||
"L2VPN terminations cannot be filtered by device in NetBox 4.4"
|
|
||||||
)
|
|
||||||
logger.debug(f"Fetching L2VPN terminations for l2vpn: {l2vpn_name or 'all'}")
|
|
||||||
try:
|
|
||||||
filters = {}
|
|
||||||
if l2vpn_name:
|
|
||||||
filters["l2vpn"] = l2vpn_name
|
|
||||||
|
|
||||||
if filters:
|
|
||||||
terminations = self._api.vpn.l2vpn_terminations.filter(**filters)
|
|
||||||
else:
|
|
||||||
terminations = self._api.vpn.l2vpn_terminations.all()
|
|
||||||
return [dict(t) for t in terminations]
|
|
||||||
except RequestError as e:
|
|
||||||
raise NetBoxAPIError(f"Failed to fetch L2VPN terminations: {e}") from e
|
|
||||||
|
|
||||||
def get_vni_for_vlan(self, vlan_id: int) -> int | None:
|
|
||||||
"""
|
|
||||||
Get VNI for a VLAN from L2VPN configuration.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
vlan_id: VLAN ID
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
VNI number or None if not mapped
|
|
||||||
"""
|
|
||||||
logger.debug(f"Fetching VNI for VLAN {vlan_id}")
|
|
||||||
try:
|
|
||||||
# Get L2VPN terminations for this VLAN
|
|
||||||
terminations = self._api.vpn.l2vpn_terminations.filter(vlan_id=vlan_id)
|
|
||||||
term_list = list(terminations)
|
|
||||||
if term_list:
|
|
||||||
l2vpn = term_list[0].l2vpn
|
|
||||||
if l2vpn and hasattr(l2vpn, "identifier"):
|
|
||||||
return l2vpn.identifier
|
|
||||||
return None
|
|
||||||
except RequestError as e:
|
|
||||||
raise NetBoxAPIError(f"Failed to fetch VNI for VLAN {vlan_id}: {e}") from e
|
|
||||||
|
|
||||||
# =========================================================================
|
|
||||||
# VRF Methods
|
|
||||||
# =========================================================================
|
|
||||||
|
|
||||||
def get_vrfs(self) -> list[dict[str, Any]]:
|
|
||||||
"""
|
|
||||||
Get all VRFs with route targets.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of VRF dictionaries including import/export route targets
|
|
||||||
"""
|
|
||||||
logger.debug("Fetching all VRFs")
|
|
||||||
try:
|
|
||||||
vrfs = self._api.ipam.vrfs.all()
|
|
||||||
return [dict(v) for v in vrfs]
|
|
||||||
except RequestError as e:
|
|
||||||
raise NetBoxAPIError(f"Failed to fetch VRFs: {e}") from e
|
|
||||||
|
|
||||||
def get_vrf(self, name: str) -> dict[str, Any] | None:
|
|
||||||
"""
|
|
||||||
Get a VRF by name.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
name: VRF name
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
VRF dictionary or None if not found
|
|
||||||
"""
|
|
||||||
logger.debug(f"Fetching VRF: {name}")
|
|
||||||
try:
|
|
||||||
vrf = self._api.ipam.vrfs.get(name=name)
|
|
||||||
if vrf:
|
|
||||||
return dict(vrf)
|
|
||||||
return None
|
|
||||||
except RequestError as e:
|
|
||||||
raise NetBoxAPIError(f"Failed to fetch VRF {name}: {e}") from e
|
|
||||||
|
|
||||||
def get_device_vrf_interfaces(
|
|
||||||
self, device_name: str, vrf_name: str
|
|
||||||
) -> list[dict[str, Any]]:
|
|
||||||
"""
|
|
||||||
Get interfaces assigned to a VRF on a device.
|
|
||||||
|
|
||||||
Note: NetBox 4.4 does not support filtering interfaces by VRF directly.
|
|
||||||
VRF is assigned to IP addresses, not interfaces. This method queries
|
|
||||||
IP addresses with VRF filter and extracts the associated interfaces.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
device_name: Device name
|
|
||||||
vrf_name: VRF name
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of interface dictionaries assigned to the VRF
|
|
||||||
"""
|
|
||||||
logger.debug(f"Fetching VRF {vrf_name} interfaces for device: {device_name}")
|
|
||||||
try:
|
|
||||||
# Get IPs in this VRF for this device
|
|
||||||
ips = self._api.ipam.ip_addresses.filter(device=device_name, vrf=vrf_name)
|
|
||||||
|
|
||||||
# Extract unique interface IDs
|
|
||||||
interface_ids = set()
|
|
||||||
for ip in ips:
|
|
||||||
if ip.assigned_object_type == "dcim.interface" and ip.assigned_object_id:
|
|
||||||
interface_ids.add(ip.assigned_object_id)
|
|
||||||
|
|
||||||
# Fetch interfaces
|
|
||||||
if not interface_ids:
|
|
||||||
return []
|
|
||||||
|
|
||||||
interfaces = []
|
|
||||||
for iid in interface_ids:
|
|
||||||
iface = self._api.dcim.interfaces.get(id=iid)
|
|
||||||
if iface:
|
|
||||||
interfaces.append(dict(iface))
|
|
||||||
return interfaces
|
|
||||||
except RequestError as e:
|
|
||||||
raise NetBoxAPIError(
|
|
||||||
f"Failed to fetch VRF interfaces for {device_name}/{vrf_name}: {e}"
|
|
||||||
) from e
|
|
||||||
|
|
||||||
def get_route_targets(self) -> list[dict[str, Any]]:
|
|
||||||
"""
|
|
||||||
Get all route targets.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of route target dictionaries
|
|
||||||
"""
|
|
||||||
logger.debug("Fetching all route targets")
|
|
||||||
try:
|
|
||||||
rts = self._api.ipam.route_targets.all()
|
|
||||||
return [dict(rt) for rt in rts]
|
|
||||||
except RequestError as e:
|
|
||||||
raise NetBoxAPIError(f"Failed to fetch route targets: {e}") from e
|
|
||||||
|
|
||||||
# =========================================================================
|
|
||||||
# MLAG Methods (Custom Fields)
|
|
||||||
# =========================================================================
|
|
||||||
|
|
||||||
def get_device_mlag_config(self, device_name: str) -> dict[str, Any]:
|
|
||||||
"""
|
|
||||||
Get MLAG configuration from device custom fields.
|
|
||||||
|
|
||||||
Custom fields expected:
|
|
||||||
- mlag_domain_id: MLAG domain identifier
|
|
||||||
- mlag_peer_address: MLAG peer IP address
|
|
||||||
- mlag_local_address: MLAG local IP address
|
|
||||||
- mlag_virtual_mac: Shared virtual-router MAC
|
|
||||||
|
|
||||||
Args:
|
|
||||||
device_name: Device name
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
MLAG configuration dictionary
|
|
||||||
"""
|
|
||||||
logger.debug(f"Fetching MLAG config for device: {device_name}")
|
|
||||||
device = self.get_device(device_name)
|
|
||||||
custom_fields = device.get("custom_fields", {}) or {}
|
|
||||||
|
|
||||||
return {
|
|
||||||
"domain_id": custom_fields.get("mlag_domain_id"),
|
|
||||||
"peer_address": custom_fields.get("mlag_peer_address"),
|
|
||||||
"local_address": custom_fields.get("mlag_local_address"),
|
|
||||||
"virtual_mac": custom_fields.get("mlag_virtual_mac"),
|
|
||||||
}
|
|
||||||
|
|
||||||
def get_mlag_peer_link(self, device_name: str) -> dict[str, Any] | None:
|
|
||||||
"""
|
|
||||||
Get MLAG peer-link interface for a device.
|
|
||||||
|
|
||||||
Looks for interface with mlag_peer_link custom field set to True.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
device_name: Device name
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Peer-link interface dictionary or None
|
|
||||||
"""
|
|
||||||
logger.debug(f"Fetching MLAG peer-link for device: {device_name}")
|
|
||||||
try:
|
|
||||||
interfaces = self._api.dcim.interfaces.filter(device=device_name)
|
|
||||||
for iface in interfaces:
|
|
||||||
cf = iface.custom_fields or {}
|
|
||||||
if cf.get("mlag_peer_link"):
|
|
||||||
return dict(iface)
|
|
||||||
return None
|
|
||||||
except RequestError as e:
|
|
||||||
raise NetBoxAPIError(
|
|
||||||
f"Failed to fetch MLAG peer-link for {device_name}: {e}"
|
|
||||||
) from e
|
|
||||||
|
|
||||||
def get_interface_mlag_id(
|
|
||||||
self, device_name: str, interface_name: str
|
|
||||||
) -> int | None:
|
|
||||||
"""
|
|
||||||
Get MLAG ID from interface custom field.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
device_name: Device name
|
|
||||||
interface_name: Interface name
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
MLAG ID or None if not assigned
|
|
||||||
"""
|
|
||||||
logger.debug(f"Fetching MLAG ID for {device_name}:{interface_name}")
|
|
||||||
try:
|
|
||||||
iface = self._api.dcim.interfaces.get(device=device_name, name=interface_name)
|
|
||||||
if iface:
|
|
||||||
cf = iface.custom_fields or {}
|
|
||||||
return cf.get("mlag_id")
|
|
||||||
return None
|
|
||||||
except RequestError as e:
|
|
||||||
raise NetBoxAPIError(
|
|
||||||
f"Failed to fetch MLAG ID for {device_name}:{interface_name}: {e}"
|
|
||||||
) from e
|
|
||||||
|
|
||||||
# =========================================================================
|
|
||||||
# VRF Custom Field Methods
|
|
||||||
# =========================================================================
|
|
||||||
|
|
||||||
def get_vrf_l3vni(self, vrf_name: str) -> int | None:
|
|
||||||
"""
|
|
||||||
Get L3 VNI from VRF custom field.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
vrf_name: VRF name
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
L3 VNI number or None if not assigned
|
|
||||||
"""
|
|
||||||
logger.debug(f"Fetching L3 VNI for VRF: {vrf_name}")
|
|
||||||
vrf = self.get_vrf(vrf_name)
|
|
||||||
if vrf:
|
|
||||||
return vrf.get("custom_fields", {}).get("l3vni")
|
|
||||||
return None
|
|
||||||
|
|
||||||
# =========================================================================
|
|
||||||
# Virtual IP Methods (Custom Fields)
|
|
||||||
# =========================================================================
|
|
||||||
|
|
||||||
def get_virtual_ips(self, device_name: str | None = None) -> list[dict[str, Any]]:
|
|
||||||
"""
|
|
||||||
Get IP addresses marked as virtual/anycast IPs.
|
|
||||||
|
|
||||||
Retrieves IPs where the custom field `virtual_ip` is True.
|
|
||||||
These are typically anycast IPs shared across an MLAG pair.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
device_name: Optional device name filter
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of virtual IP address dictionaries
|
|
||||||
"""
|
|
||||||
logger.debug(f"Fetching virtual IPs for device: {device_name or 'all'}")
|
|
||||||
try:
|
|
||||||
if device_name:
|
|
||||||
ips = self._api.ipam.ip_addresses.filter(device=device_name)
|
|
||||||
else:
|
|
||||||
ips = self._api.ipam.ip_addresses.all()
|
|
||||||
|
|
||||||
virtual_ips = []
|
|
||||||
for ip in ips:
|
|
||||||
cf = ip.custom_fields or {}
|
|
||||||
if cf.get("virtual_ip"):
|
|
||||||
virtual_ips.append(dict(ip))
|
|
||||||
return virtual_ips
|
|
||||||
except RequestError as e:
|
|
||||||
raise NetBoxAPIError(f"Failed to fetch virtual IPs: {e}") from e
|
|
||||||
|
|
||||||
# =========================================================================
|
|
||||||
# Aggregated Intent Methods
|
|
||||||
# =========================================================================
|
|
||||||
|
|
||||||
def get_device_intent(self, device_name: str) -> dict[str, Any]:
|
|
||||||
"""
|
|
||||||
Get complete fabric intent for a single device.
|
|
||||||
|
|
||||||
Aggregates all relevant configuration data for a device including:
|
|
||||||
- Device info and custom fields
|
|
||||||
- Interfaces (physical, loopback, LAG, SVI)
|
|
||||||
- IP addresses
|
|
||||||
- BGP sessions and peer groups
|
|
||||||
- VLAN assignments
|
|
||||||
- L2VPN/EVPN terminations
|
|
||||||
- VRF assignments
|
|
||||||
- MLAG configuration
|
|
||||||
|
|
||||||
Args:
|
|
||||||
device_name: Device name
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Complete device intent dictionary
|
|
||||||
"""
|
|
||||||
logger.info(f"Building complete intent for device: {device_name}")
|
|
||||||
|
|
||||||
# Get device info
|
|
||||||
device = self.get_device(device_name)
|
|
||||||
|
|
||||||
# Get interfaces
|
|
||||||
all_interfaces = self.get_device_interfaces(device_name)
|
|
||||||
loopbacks = self.get_device_loopbacks(device_name)
|
|
||||||
lags = self.get_device_lags(device_name)
|
|
||||||
svis = self.get_device_svis(device_name)
|
|
||||||
|
|
||||||
# Get IP addresses
|
|
||||||
ip_addresses = self.get_device_ips(device_name)
|
|
||||||
|
|
||||||
# Get ASN
|
|
||||||
asn = self.get_device_asn(device_name)
|
|
||||||
|
|
||||||
# Get BGP configuration
|
|
||||||
bgp_sessions = self.get_bgp_sessions(device_name)
|
|
||||||
bgp_peer_groups = self.get_bgp_peer_groups(device_name)
|
|
||||||
|
|
||||||
# Get L2VPN terminations for this device
|
|
||||||
l2vpn_terminations = self.get_l2vpn_terminations(device_name=device_name)
|
|
||||||
|
|
||||||
# Get MLAG configuration
|
|
||||||
mlag_config = self.get_device_mlag_config(device_name)
|
|
||||||
mlag_peer_link = self.get_mlag_peer_link(device_name)
|
|
||||||
|
|
||||||
# Build intent dictionary
|
|
||||||
intent = {
|
|
||||||
"device": {
|
|
||||||
"name": device.get("name"),
|
|
||||||
"role": device.get("role", {}).get("slug") if device.get("role") else None,
|
|
||||||
"platform": (
|
|
||||||
device.get("platform", {}).get("slug") if device.get("platform") else None
|
|
||||||
),
|
|
||||||
"site": device.get("site", {}).get("slug") if device.get("site") else None,
|
|
||||||
"custom_fields": device.get("custom_fields", {}),
|
|
||||||
},
|
|
||||||
"asn": asn,
|
|
||||||
"interfaces": {
|
|
||||||
"all": all_interfaces,
|
|
||||||
"loopbacks": loopbacks,
|
|
||||||
"lags": lags,
|
|
||||||
"svis": svis,
|
|
||||||
},
|
|
||||||
"ip_addresses": ip_addresses,
|
|
||||||
"bgp": {
|
|
||||||
"sessions": bgp_sessions,
|
|
||||||
"peer_groups": bgp_peer_groups,
|
|
||||||
},
|
|
||||||
"l2vpn_terminations": l2vpn_terminations,
|
|
||||||
"mlag": {
|
|
||||||
"config": mlag_config,
|
|
||||||
"peer_link": mlag_peer_link,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.debug(f"Built intent for {device_name} with {len(all_interfaces)} interfaces")
|
|
||||||
return intent
|
|
||||||
|
|
||||||
def get_fabric_intent(self) -> dict[str, Any]:
|
|
||||||
"""
|
|
||||||
Get complete fabric intent for all devices.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Dictionary with fabric-wide intent including:
|
|
||||||
- All devices with their intent
|
|
||||||
- Fabric-wide VLANs
|
|
||||||
- Fabric-wide L2VPNs
|
|
||||||
- Fabric-wide VRFs
|
|
||||||
"""
|
|
||||||
logger.info("Building complete fabric intent")
|
|
||||||
|
|
||||||
# Get all devices
|
|
||||||
spine_devices = self.get_spine_devices()
|
|
||||||
leaf_devices = self.get_leaf_devices()
|
|
||||||
|
|
||||||
# Get fabric-wide configuration
|
|
||||||
vlans = self.get_vlans()
|
|
||||||
l2vpns = self.get_l2vpns()
|
|
||||||
vrfs = self.get_vrfs()
|
|
||||||
|
|
||||||
# Build device intents
|
|
||||||
device_intents = {}
|
|
||||||
for device in spine_devices + leaf_devices:
|
|
||||||
name = device.get("name")
|
|
||||||
if name:
|
|
||||||
device_intents[name] = self.get_device_intent(name)
|
|
||||||
|
|
||||||
return {
|
|
||||||
"devices": device_intents,
|
|
||||||
"vlans": vlans,
|
|
||||||
"l2vpns": l2vpns,
|
|
||||||
"vrfs": vrfs,
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user