From e46946606c2c20b60078ad5a4c0af0f9ef2b1dfd Mon Sep 17 00:00:00 2001 From: Damien Date: Thu, 5 Feb 2026 09:38:05 +0100 Subject: [PATCH] Remove fabric provisioning script The `scripts/provision_fabric.py` script and its associated NetBox client module (`src/netbox/`) have been removed. This functionality is no longer needed. --- scripts/README.md | 84 --- scripts/provision_fabric.py | 1064 ----------------------------------- src/netbox/__init__.py | 36 -- src/netbox/client.py | 855 ---------------------------- 4 files changed, 2039 deletions(-) delete mode 100644 scripts/README.md delete mode 100644 scripts/provision_fabric.py delete mode 100644 src/netbox/__init__.py delete mode 100644 src/netbox/client.py diff --git a/scripts/README.md b/scripts/README.md deleted file mode 100644 index e0ff3b7..0000000 --- a/scripts/README.md +++ /dev/null @@ -1,84 +0,0 @@ -# NetBox Provisioning for EVPN-VXLAN Fabric - -This directory contains scripts to populate NetBox with the fabric topology defined in [arista-evpn-vxlan-clab](https://gitea.arnodo.fr/Damien/arista-evpn-vxlan-clab). - -## Prerequisites - -- NetBox 4.4.x running and accessible -- NetBox BGP Plugin v0.17.x installed (optional, for BGP sessions) -- Python 3.9+ -- API token with write permissions - -## Installation - -```bash -uv add pynetbox -``` - -## Usage - -```bash -export NETBOX_URL="http://netbox.example.com" -export NETBOX_TOKEN="your-api-token" -uv run python scripts/provision_fabric.py -``` - -## What Gets Created - -### Custom Fields - -| Object Type | Field | Description | -| ----------- | -------------------- | -------------------------- | -| Device | `asn` | BGP ASN | -| Device | `mlag_domain_id` | MLAG domain identifier | -| Device | `mlag_peer_address` | MLAG peer IP | -| Device | `mlag_local_address` | MLAG local IP | -| Device | `mlag_virtual_mac` | Shared virtual MAC | -| Interface | `mlag_peer_link` | Marks peer-link interfaces | -| Interface | `mlag_id` | MLAG ID for host LAGs | -| VRF | `l3vni` | L3 VNI for EVPN | -| VRF | `vrf_vlan` | VLAN for L3 VNI SVI | -| IP Address | `virtual_ip` | Anycast/virtual IP flag | - -### Organization - -- **Site**: evpn-lab -- **Manufacturer**: Arista -- **Device Types**: cEOS-lab, Linux Server -- **Device Roles**: Spine, Leaf, Server - -### Devices - -| Device | Role | ASN | MLAG Domain | -| -------------- | ------ | ----- | ----------- | -| spine1, spine2 | Spine | 65000 | - | -| leaf1, leaf2 | Leaf | 65001 | MLAG1 | -| leaf3, leaf4 | Leaf | 65002 | MLAG2 | -| leaf5, leaf6 | Leaf | 65003 | MLAG3 | -| leaf7, leaf8 | Leaf | 65004 | MLAG4 | -| host1-4 | Server | - | - | - -### Cabling - -- Spine1/2 Ethernet1-8 → Leaf1-8 Ethernet11/12 -- MLAG peer-links: Leaf pairs via Ethernet10 -- Host dual-homing: eth1/eth2 to MLAG pairs - -### IP Addressing - -| Purpose | Prefix | -| --------------------- | ------------- | -| Spine1-Leaf P2P | 10.0.1.0/24 | -| Spine2-Leaf P2P | 10.0.2.0/24 | -| MLAG iBGP P2P | 10.0.3.0/24 | -| MLAG Peer VLAN | 10.0.199.0/24 | -| Loopback0 (Router-ID) | 10.0.250.0/24 | -| Loopback1 (VTEP) | 10.0.255.0/24 | - -## Idempotency - -The script is idempotent - running it multiple times will not create duplicate objects. - -## Reference - -- [NetBox Data Model Documentation](../docs/netbox-data-model.md) diff --git a/scripts/provision_fabric.py b/scripts/provision_fabric.py deleted file mode 100644 index 1c9b229..0000000 --- a/scripts/provision_fabric.py +++ /dev/null @@ -1,1064 +0,0 @@ -#!/usr/bin/env python3 -""" -NetBox Provisioning Script for EVPN-VXLAN Fabric - -This script populates NetBox with the complete fabric topology defined in -arista-evpn-vxlan-clab, including devices, interfaces, cables, IP addresses, -VLANs, and custom fields required for fabric orchestration. - -Requirements: - pip install pynetbox - -Usage: - export NETBOX_URL="http://netbox.example.com" - export NETBOX_TOKEN="your-api-token" - python scripts/provision_fabric.py - -Reference: - https://gitea.arnodo.fr/Damien/fabric-orchestrator/src/branch/main/docs/netbox-data-model.md - https://gitea.arnodo.fr/Damien/arista-evpn-vxlan-clab -""" - -import os -import sys -from typing import Any - -try: - import pynetbox -except ImportError: - print("Error: pynetbox is required. Install with: pip install pynetbox") - sys.exit(1) - - -# ============================================================================= -# Configuration -# ============================================================================= - -NETBOX_URL = os.environ.get("NETBOX_URL", "http://localhost:8000") -NETBOX_TOKEN = os.environ.get("NETBOX_TOKEN", "") - -# Fabric configuration -SITE_NAME = "evpn-lab" -SITE_SLUG = "evpn-lab" - -MANUFACTURER_NAME = "Arista" -MANUFACTURER_SLUG = "arista" - -# Device type from community library -# https://github.com/netbox-community/devicetype-library/tree/master/device-types/Arista -DEVICE_TYPE_MODEL = "cEOS-lab" -DEVICE_TYPE_SLUG = "ceos-lab" - -# ============================================================================= -# Custom Fields Definition -# ============================================================================= - -CUSTOM_FIELDS = [ - # Device custom fields - { - "object_types": ["dcim.device"], - "name": "asn", - "label": "ASN", - "type": "integer", - "required": False, - "description": "BGP Autonomous System Number assigned to this device", - }, - { - "object_types": ["dcim.device"], - "name": "mlag_domain_id", - "label": "MLAG Domain ID", - "type": "text", - "required": False, - "description": "MLAG domain identifier", - }, - { - "object_types": ["dcim.device"], - "name": "mlag_peer_address", - "label": "MLAG Peer Address", - "type": "text", - "required": False, - "description": "MLAG peer IP address", - }, - { - "object_types": ["dcim.device"], - "name": "mlag_local_address", - "label": "MLAG Local Address", - "type": "text", - "required": False, - "description": "MLAG local IP address", - }, - { - "object_types": ["dcim.device"], - "name": "mlag_virtual_mac", - "label": "MLAG Virtual MAC", - "type": "text", - "required": False, - "description": "Shared virtual-router MAC address", - }, - # Interface custom fields - { - "object_types": ["dcim.interface"], - "name": "mlag_peer_link", - "label": "MLAG Peer Link", - "type": "boolean", - "required": False, - "description": "Marks interface as MLAG peer-link", - }, - { - "object_types": ["dcim.interface"], - "name": "mlag_id", - "label": "MLAG ID", - "type": "integer", - "required": False, - "description": "MLAG port-channel ID for host-facing LAGs", - }, - # VRF custom fields - { - "object_types": ["ipam.vrf"], - "name": "l3vni", - "label": "L3 VNI", - "type": "integer", - "required": False, - "description": "Layer 3 VNI for EVPN symmetric IRB", - }, - { - "object_types": ["ipam.vrf"], - "name": "vrf_vlan", - "label": "VRF VLAN", - "type": "integer", - "required": False, - "description": "VLAN ID used for L3 VNI SVI", - }, - # IP Address custom fields - { - "object_types": ["ipam.ipaddress"], - "name": "virtual_ip", - "label": "Virtual IP", - "type": "boolean", - "required": False, - "description": "Marks IP as anycast/virtual IP (shared across MLAG pair)", - }, -] - -# ============================================================================= -# Device Definitions -# ============================================================================= - -DEVICE_ROLES = [ - {"name": "Spine", "slug": "spine", "color": "ff5722"}, - {"name": "Leaf", "slug": "leaf", "color": "4caf50"}, - {"name": "Server", "slug": "server", "color": "2196f3"}, -] - -# Spine devices -SPINES = [ - {"name": "spine1", "mgmt_ip": "172.16.0.1/24", "asn": 65000, "loopback0": "10.0.250.1/32"}, - {"name": "spine2", "mgmt_ip": "172.16.0.2/24", "asn": 65000, "loopback0": "10.0.250.2/32"}, -] - -# Leaf devices with MLAG configuration -LEAFS = [ - { - "name": "leaf1", - "mgmt_ip": "172.16.0.25/24", - "asn": 65001, - "loopback0": "10.0.250.11/32", - "loopback1": "10.0.255.11/32", # Shared VTEP with leaf2 - "mlag": { - "domain_id": "MLAG1", - "local_address": "10.0.199.254", - "peer_address": "10.0.199.255", - "virtual_mac": "00:1c:73:00:00:01", - }, - }, - { - "name": "leaf2", - "mgmt_ip": "172.16.0.50/24", - "asn": 65001, - "loopback0": "10.0.250.12/32", - "loopback1": "10.0.255.11/32", # Shared VTEP with leaf1 - "mlag": { - "domain_id": "MLAG1", - "local_address": "10.0.199.255", - "peer_address": "10.0.199.254", - "virtual_mac": "00:1c:73:00:00:01", - }, - }, - { - "name": "leaf3", - "mgmt_ip": "172.16.0.27/24", - "asn": 65002, - "loopback0": "10.0.250.13/32", - "loopback1": "10.0.255.12/32", - "mlag": { - "domain_id": "MLAG2", - "local_address": "10.0.199.252", - "peer_address": "10.0.199.253", - "virtual_mac": "00:1c:73:00:00:02", - }, - }, - { - "name": "leaf4", - "mgmt_ip": "172.16.0.28/24", - "asn": 65002, - "loopback0": "10.0.250.14/32", - "loopback1": "10.0.255.12/32", - "mlag": { - "domain_id": "MLAG2", - "local_address": "10.0.199.253", - "peer_address": "10.0.199.252", - "virtual_mac": "00:1c:73:00:00:02", - }, - }, - { - "name": "leaf5", - "mgmt_ip": "172.16.0.29/24", - "asn": 65003, - "loopback0": "10.0.250.15/32", - "loopback1": "10.0.255.13/32", - "mlag": { - "domain_id": "MLAG3", - "local_address": "10.0.199.250", - "peer_address": "10.0.199.251", - "virtual_mac": "00:1c:73:00:00:03", - }, - }, - { - "name": "leaf6", - "mgmt_ip": "172.16.0.30/24", - "asn": 65003, - "loopback0": "10.0.250.16/32", - "loopback1": "10.0.255.13/32", - "mlag": { - "domain_id": "MLAG3", - "local_address": "10.0.199.251", - "peer_address": "10.0.199.250", - "virtual_mac": "00:1c:73:00:00:03", - }, - }, - { - "name": "leaf7", - "mgmt_ip": "172.16.0.31/24", - "asn": 65004, - "loopback0": "10.0.250.17/32", - "loopback1": "10.0.255.14/32", - "mlag": { - "domain_id": "MLAG4", - "local_address": "10.0.199.248", - "peer_address": "10.0.199.249", - "virtual_mac": "00:1c:73:00:00:04", - }, - }, - { - "name": "leaf8", - "mgmt_ip": "172.16.0.32/24", - "asn": 65004, - "loopback0": "10.0.250.18/32", - "loopback1": "10.0.255.14/32", - "mlag": { - "domain_id": "MLAG4", - "local_address": "10.0.199.249", - "peer_address": "10.0.199.248", - "virtual_mac": "00:1c:73:00:00:04", - }, - }, -] - -# Host devices -HOSTS = [ - {"name": "host1", "mgmt_ip": "172.16.0.101/24", "vlan": 40, "ip": "10.40.40.101/24"}, - {"name": "host2", "mgmt_ip": "172.16.0.102/24", "vlan": 34, "ip": "10.34.34.102/24"}, - {"name": "host3", "mgmt_ip": "172.16.0.103/24", "vlan": 40, "ip": "10.40.40.103/24"}, - {"name": "host4", "mgmt_ip": "172.16.0.104/24", "vlan": 78, "ip": "10.78.78.104/24"}, -] - -# ============================================================================= -# Cabling Matrix -# ============================================================================= - -# Spine to Leaf connections: (spine_name, spine_intf, leaf_name, leaf_intf) -SPINE_LEAF_CABLES = [ - # Spine1 connections - ("spine1", "Ethernet1", "leaf1", "Ethernet11"), - ("spine1", "Ethernet2", "leaf2", "Ethernet11"), - ("spine1", "Ethernet3", "leaf3", "Ethernet11"), - ("spine1", "Ethernet4", "leaf4", "Ethernet11"), - ("spine1", "Ethernet5", "leaf5", "Ethernet11"), - ("spine1", "Ethernet6", "leaf6", "Ethernet11"), - ("spine1", "Ethernet7", "leaf7", "Ethernet11"), - ("spine1", "Ethernet8", "leaf8", "Ethernet11"), - # Spine2 connections - ("spine2", "Ethernet1", "leaf1", "Ethernet12"), - ("spine2", "Ethernet2", "leaf2", "Ethernet12"), - ("spine2", "Ethernet3", "leaf3", "Ethernet12"), - ("spine2", "Ethernet4", "leaf4", "Ethernet12"), - ("spine2", "Ethernet5", "leaf5", "Ethernet12"), - ("spine2", "Ethernet6", "leaf6", "Ethernet12"), - ("spine2", "Ethernet7", "leaf7", "Ethernet12"), - ("spine2", "Ethernet8", "leaf8", "Ethernet12"), -] - -# MLAG Peer-link cables: (leaf_a, intf_a, leaf_b, intf_b) -MLAG_PEER_CABLES = [ - ("leaf1", "Ethernet10", "leaf2", "Ethernet10"), - ("leaf3", "Ethernet10", "leaf4", "Ethernet10"), - ("leaf5", "Ethernet10", "leaf6", "Ethernet10"), - ("leaf7", "Ethernet10", "leaf8", "Ethernet10"), -] - -# Host dual-homing cables: (leaf_a, intf_a, leaf_b, intf_b, host, host_intf_a, host_intf_b) -HOST_CABLES = [ - ("leaf1", "Ethernet1", "leaf2", "Ethernet1", "host1"), - ("leaf3", "Ethernet1", "leaf4", "Ethernet1", "host2"), - ("leaf5", "Ethernet1", "leaf6", "Ethernet1", "host3"), - ("leaf7", "Ethernet1", "leaf8", "Ethernet1", "host4"), -] - -# ============================================================================= -# IP Addressing for P2P Links -# ============================================================================= - -# Spine1 P2P addresses: leaf_name -> (spine_ip, leaf_ip) -SPINE1_P2P = { - "leaf1": ("10.0.1.0/31", "10.0.1.1/31"), - "leaf2": ("10.0.1.2/31", "10.0.1.3/31"), - "leaf3": ("10.0.1.4/31", "10.0.1.5/31"), - "leaf4": ("10.0.1.6/31", "10.0.1.7/31"), - "leaf5": ("10.0.1.8/31", "10.0.1.9/31"), - "leaf6": ("10.0.1.10/31", "10.0.1.11/31"), - "leaf7": ("10.0.1.12/31", "10.0.1.13/31"), - "leaf8": ("10.0.1.14/31", "10.0.1.15/31"), -} - -SPINE2_P2P = { - "leaf1": ("10.0.2.0/31", "10.0.2.1/31"), - "leaf2": ("10.0.2.2/31", "10.0.2.3/31"), - "leaf3": ("10.0.2.4/31", "10.0.2.5/31"), - "leaf4": ("10.0.2.6/31", "10.0.2.7/31"), - "leaf5": ("10.0.2.8/31", "10.0.2.9/31"), - "leaf6": ("10.0.2.10/31", "10.0.2.11/31"), - "leaf7": ("10.0.2.12/31", "10.0.2.13/31"), - "leaf8": ("10.0.2.14/31", "10.0.2.15/31"), -} - -# MLAG iBGP P2P addresses: (leaf_a, leaf_b) -> (leaf_a_ip, leaf_b_ip) -MLAG_IBGP_P2P = { - ("leaf1", "leaf2"): ("10.0.3.0/31", "10.0.3.1/31"), - ("leaf3", "leaf4"): ("10.0.3.2/31", "10.0.3.3/31"), - ("leaf5", "leaf6"): ("10.0.3.4/31", "10.0.3.5/31"), - ("leaf7", "leaf8"): ("10.0.3.6/31", "10.0.3.7/31"), -} - -# ============================================================================= -# VLANs and VRFs -# ============================================================================= - -VLAN_GROUP_NAME = "evpn-fabric" - -VLANS = [ - {"vid": 34, "name": "vlan34-l3", "description": "L3 VLAN for VRF gold"}, - {"vid": 40, "name": "vlan40-l2", "description": "L2 VXLAN stretched VLAN"}, - {"vid": 78, "name": "vlan78-l3", "description": "L3 VLAN for VRF gold"}, - {"vid": 4090, "name": "mlag-peer", "description": "MLAG peer communication"}, - {"vid": 4091, "name": "mlag-ibgp", "description": "iBGP between MLAG peers"}, -] - -VRFS = [ - { - "name": "gold", - "rd": "1:100001", - "l3vni": 100001, - "vrf_vlan": 3000, - "import_targets": ["1:100001"], - "export_targets": ["1:100001"], - }, -] - -# ============================================================================= -# Prefixes -# ============================================================================= - -PREFIXES = [ - {"prefix": "10.0.1.0/24", "description": "Spine1-Leaf P2P"}, - {"prefix": "10.0.2.0/24", "description": "Spine2-Leaf P2P"}, - {"prefix": "10.0.3.0/24", "description": "MLAG iBGP P2P"}, - {"prefix": "10.0.199.0/24", "description": "MLAG Peer VLAN 4090"}, - {"prefix": "10.0.250.0/24", "description": "Loopback0 (Router-ID)"}, - {"prefix": "10.0.255.0/24", "description": "Loopback1 (VTEP)"}, - {"prefix": "10.34.34.0/24", "description": "VLAN 34 subnet", "vrf": "gold"}, - {"prefix": "10.40.40.0/24", "description": "VLAN 40 subnet"}, - {"prefix": "10.78.78.0/24", "description": "VLAN 78 subnet", "vrf": "gold"}, - {"prefix": "172.16.0.0/24", "description": "Management network"}, -] - - -# ============================================================================= -# Helper Functions -# ============================================================================= - - -def get_or_create(endpoint, search_params: dict, create_params: dict) -> Any: - """Get existing object or create new one.""" - # Use filter() instead of get() to handle multiple results gracefully - results = list(endpoint.filter(**search_params)) - if results: - return results[0], False - - obj = endpoint.create(create_params) - return obj, True - - -def log_result(action: str, obj_type: str, name: str, created: bool): - """Log creation result.""" - status = "Created" if created else "Exists" - print(f" [{status}] {obj_type}: {name}") - - -# ============================================================================= -# Provisioning Functions -# ============================================================================= - - -def create_custom_fields(nb: pynetbox.api): - """Create custom fields for fabric orchestration.""" - print("\n=== Creating Custom Fields ===") - - for cf in CUSTOM_FIELDS: - existing = nb.extras.custom_fields.get(name=cf["name"]) - if existing: - print(f" [Exists] Custom Field: {cf['name']}") - continue - - nb.extras.custom_fields.create(cf) - print(f" [Created] Custom Field: {cf['name']}") - - -def create_organization(nb: pynetbox.api) -> dict: - """Create site, manufacturer, device types, and roles.""" - print("\n=== Creating Organization ===") - result = {} - - # Site - site, created = get_or_create( - nb.dcim.sites, - {"slug": SITE_SLUG}, - {"name": SITE_NAME, "status": "active"}, - ) - log_result("Site", "Site", SITE_NAME, created) - result["site"] = site - - # Manufacturer - manufacturer, created = get_or_create( - nb.dcim.manufacturers, - {"slug": MANUFACTURER_SLUG}, - {"name": MANUFACTURER_NAME}, - ) - log_result("Manufacturer", "Manufacturer", MANUFACTURER_NAME, created) - result["manufacturer"] = manufacturer - - # Device Type for switches - device_type, created = get_or_create( - nb.dcim.device_types, - {"slug": DEVICE_TYPE_SLUG}, - { - "model": DEVICE_TYPE_MODEL, - "manufacturer": manufacturer.id, - "u_height": 1, - }, - ) - log_result("DeviceType", "DeviceType", DEVICE_TYPE_MODEL, created) - result["device_type"] = device_type - - # Device Type for servers/hosts - server_type, created = get_or_create( - nb.dcim.device_types, - {"slug": "linux-server"}, - { - "model": "Linux Server", - "manufacturer": manufacturer.id, - "u_height": 1, - }, - ) - log_result("DeviceType", "DeviceType", "Linux Server", created) - result["server_type"] = server_type - - # Device Roles - result["roles"] = {} - for role in DEVICE_ROLES: - role_obj, created = get_or_create( - nb.dcim.device_roles, - {"slug": role["slug"]}, - {"name": role["name"], "color": role["color"]}, - ) - log_result("DeviceRole", "DeviceRole", role["name"], created) - result["roles"][role["slug"]] = role_obj - - return result - - -def create_vlans(nb: pynetbox.api, site) -> dict: - """Create VLAN group and VLANs.""" - print("\n=== Creating VLANs ===") - result = {} - - # VLAN Group - # For NetBox 4.x, scope_type/scope_id are deprecated in favor of scope_object - # but for simple site scope we can just pass scope_type and scope_id or - # rely on backward compatibility if it exists. Ideally we check NetBox version - # but here we'll try to be compatible. - vlan_group, created = get_or_create( - nb.ipam.vlan_groups, - {"slug": VLAN_GROUP_NAME}, - {"name": VLAN_GROUP_NAME, "scope_type": "dcim.site", "scope_id": site.id}, - ) - log_result("VLANGroup", "VLANGroup", VLAN_GROUP_NAME, created) - result["group"] = vlan_group - - # VLANs - result["vlans"] = {} - for vlan in VLANS: - vlan_obj, created = get_or_create( - nb.ipam.vlans, - {"vid": vlan["vid"], "group_id": vlan_group.id}, - { - "vid": vlan["vid"], - "name": vlan["name"], - "description": vlan.get("description", ""), - "group": vlan_group.id, # Create expects 'group', filter expects 'group_id' - }, - ) - log_result("VLAN", "VLAN", f"{vlan['vid']} ({vlan['name']})", created) - result["vlans"][vlan["vid"]] = vlan_obj - - return result - - -def create_vrfs(nb: pynetbox.api) -> dict: - """Create VRFs with route targets.""" - print("\n=== Creating VRFs ===") - result = {} - - for vrf_def in VRFS: - # Create route targets first - import_rts = [] - export_rts = [] - - for rt in vrf_def.get("import_targets", []): - rt_obj, created = get_or_create( - nb.ipam.route_targets, - {"name": rt}, - {"name": rt, "description": f"Import RT for {vrf_def['name']}"}, - ) - import_rts.append(rt_obj.id) - log_result("RouteTarget", "RouteTarget", rt, created) - - for rt in vrf_def.get("export_targets", []): - rt_obj = nb.ipam.route_targets.get(name=rt) - if rt_obj: - export_rts.append(rt_obj.id) - - # Create VRF - vrf, created = get_or_create( - nb.ipam.vrfs, - {"name": vrf_def["name"]}, - { - "name": vrf_def["name"], - "rd": vrf_def.get("rd"), - "import_targets": import_rts, - "export_targets": export_rts, - "custom_fields": { - "l3vni": vrf_def.get("l3vni"), - "vrf_vlan": vrf_def.get("vrf_vlan"), - }, - }, - ) - log_result("VRF", "VRF", vrf_def["name"], created) - result[vrf_def["name"]] = vrf - - return result - - -def create_prefixes(nb: pynetbox.api, vrfs: dict): - """Create IP prefixes.""" - print("\n=== Creating Prefixes ===") - - for prefix_def in PREFIXES: - vrf_id = None - if "vrf" in prefix_def: - vrf_id = vrfs.get(prefix_def["vrf"]) - if vrf_id: - vrf_id = vrf_id.id - - create_params = { - "prefix": prefix_def["prefix"], - "description": prefix_def.get("description", ""), - } - if vrf_id: - create_params["vrf"] = vrf_id - - prefix, created = get_or_create( - nb.ipam.prefixes, - {"prefix": prefix_def["prefix"], "vrf_id": vrf_id}, - create_params, - ) - log_result("Prefix", "Prefix", prefix_def["prefix"], created) - - -def create_devices(nb: pynetbox.api, org: dict) -> dict: - """Create all network devices.""" - print("\n=== Creating Devices ===") - result = {} - - # Create Spines - for spine in SPINES: - device, created = get_or_create( - nb.dcim.devices, - {"name": spine["name"]}, - { - "name": spine["name"], - "device_type": org["device_type"].id, - "role": org["roles"]["spine"].id, - "site": org["site"].id, - "status": "active", - "custom_fields": {"asn": spine["asn"]}, - }, - ) - log_result("Device", "Device", spine["name"], created) - result[spine["name"]] = {"device": device, "config": spine} - - # Create Leafs - for leaf in LEAFS: - mlag = leaf.get("mlag", {}) - custom_fields = { - "asn": leaf["asn"], - "mlag_domain_id": mlag.get("domain_id"), - "mlag_peer_address": mlag.get("peer_address"), - "mlag_local_address": mlag.get("local_address"), - "mlag_virtual_mac": mlag.get("virtual_mac"), - } - - device, created = get_or_create( - nb.dcim.devices, - {"name": leaf["name"]}, - { - "name": leaf["name"], - "device_type": org["device_type"].id, - "role": org["roles"]["leaf"].id, - "site": org["site"].id, - "status": "active", - "custom_fields": custom_fields, - }, - ) - log_result("Device", "Device", leaf["name"], created) - result[leaf["name"]] = {"device": device, "config": leaf} - - # Create Hosts - for host in HOSTS: - device, created = get_or_create( - nb.dcim.devices, - {"name": host["name"]}, - { - "name": host["name"], - "device_type": org["server_type"].id, - "role": org["roles"]["server"].id, - "site": org["site"].id, - "status": "active", - }, - ) - log_result("Device", "Device", host["name"], created) - result[host["name"]] = {"device": device, "config": host} - - return result - - -def create_interfaces(nb: pynetbox.api, devices: dict) -> dict: - """Create all device interfaces.""" - print("\n=== Creating Interfaces ===") - result = {} - - for device_name, device_data in devices.items(): - device = device_data["device"] - config = device_data["config"] - result[device_name] = {} - - # Determine interface requirements based on device type - if device_name.startswith("spine"): - # Spines: 8 Ethernet ports for leaf connections + Management + Loopback0 - interfaces = [ - ("Management1", "virtual"), - ("Loopback0", "virtual"), - ] - for i in range(1, 9): - interfaces.append((f"Ethernet{i}", "1000base-t")) - - elif device_name.startswith("leaf"): - # Leafs: Ethernet1 (host), Ethernet10 (peer-link), Ethernet11-12 (spines) - # + Management + Loopback0 + Loopback1 + Vxlan1 - interfaces = [ - ("Management1", "virtual"), - ("Loopback0", "virtual"), - ("Loopback1", "virtual"), - ("Vxlan1", "virtual"), - ("Ethernet1", "1000base-t"), # Host-facing - ("Ethernet10", "1000base-t"), # MLAG peer-link - ("Ethernet11", "1000base-t"), # Spine1 - ("Ethernet12", "1000base-t"), # Spine2 - ("Port-Channel10", "lag"), # MLAG peer-link LAG - ] - - elif device_name.startswith("host"): - # Hosts: eth1, eth2 for bonding + bond0 - interfaces = [ - ("eth0", "virtual"), # Management - ("eth1", "1000base-t"), - ("eth2", "1000base-t"), - ("bond0", "lag"), - ] - - else: - continue - - for intf_name, intf_type in interfaces: - # Search with device_id but create with device - existing = nb.dcim.interfaces.get(device_id=device.id, name=intf_name) - if existing: - intf = existing - created = False - else: - intf = nb.dcim.interfaces.create( - { - "device": device.id, - "name": intf_name, - "type": intf_type, - } - ) - created = True - - # Set custom fields for MLAG peer-link - if intf_name in ("Ethernet10", "Port-Channel10") and device_name.startswith("leaf"): - if created or not intf.custom_fields.get("mlag_peer_link"): - intf.custom_fields = {"mlag_peer_link": True} - intf.save() - - result[device_name][intf_name] = intf - - if created: - log_result("Interface", "Interfaces", f"{device_name}", True) - - return result - - -def create_ip_addresses(nb: pynetbox.api, devices: dict, interfaces: dict, vrfs: dict): - """Create IP addresses and assign to interfaces.""" - print("\n=== Creating IP Addresses ===") - - # Loopback addresses for spines - for spine in SPINES: - device = devices[spine["name"]]["device"] - intf = interfaces[spine["name"]]["Loopback0"] - - ip, created = get_or_create( - nb.ipam.ip_addresses, - { - "address": spine["loopback0"], - "assigned_object_id": intf.id, - }, - { - "address": spine["loopback0"], - "role": "loopback", - "assigned_object_type": "dcim.interface", - "assigned_object_id": intf.id, - "description": f"{spine['name']} Router-ID", - }, - ) - if not created: - current_role = ip.role.value if hasattr(ip.role, "value") else ip.role - if current_role != "loopback": - ip.role = "loopback" - ip.save() - log_result("IP", "IP Address", f"{spine['loopback0']} (updated role)", True) - - log_result("IP", "IP Address", spine["loopback0"], created) - - # Loopback addresses for leafs - for leaf in LEAFS: - device = devices[leaf["name"]]["device"] - - # Loopback0 - intf = interfaces[leaf["name"]]["Loopback0"] - ip, created = get_or_create( - nb.ipam.ip_addresses, - { - "address": leaf["loopback0"], - "assigned_object_id": intf.id, - }, - { - "address": leaf["loopback0"], - "role": "loopback", - "assigned_object_type": "dcim.interface", - "assigned_object_id": intf.id, - "description": f"{leaf['name']} Router-ID", - }, - ) - if not created: - current_role = ip.role.value if hasattr(ip.role, "value") else ip.role - if current_role != "loopback": - ip.role = "loopback" - ip.save() - log_result("IP", "IP Address", f"{leaf['loopback0']} (updated role)", True) - - log_result("IP", "IP Address", leaf["loopback0"], created) - - # Loopback1 (VTEP) - intf = interfaces[leaf["name"]]["Loopback1"] - ip, created = get_or_create( - nb.ipam.ip_addresses, - { - "address": leaf["loopback1"], - "assigned_object_id": intf.id, - }, - { - "address": leaf["loopback1"], - "role": "anycast", - "assigned_object_type": "dcim.interface", - "assigned_object_id": intf.id, - "description": f"{leaf['name']} VTEP", - }, - ) - if not created: - current_role = ip.role.value if hasattr(ip.role, "value") else ip.role - if current_role != "anycast": - ip.role = "anycast" - ip.save() - log_result("IP", "IP Address", f"{leaf['loopback1']} (updated role)", True) - - log_result("IP", "IP Address", leaf["loopback1"], created) - - # P2P addresses for Spine1-Leaf links - for leaf_name, (spine_ip, leaf_ip) in SPINE1_P2P.items(): - # Spine side - spine_intf = interfaces["spine1"][f"Ethernet{list(SPINE1_P2P.keys()).index(leaf_name) + 1}"] - ip, created = get_or_create( - nb.ipam.ip_addresses, - { - "address": spine_ip, - "assigned_object_id": spine_intf.id, - }, - { - "address": spine_ip, - "assigned_object_type": "dcim.interface", - "assigned_object_id": spine_intf.id, - "description": f"spine1 to {leaf_name}", - }, - ) - log_result("IP", "IP Address", spine_ip, created) - - # Leaf side - leaf_intf = interfaces[leaf_name]["Ethernet11"] - ip, created = get_or_create( - nb.ipam.ip_addresses, - { - "address": leaf_ip, - "assigned_object_id": leaf_intf.id, - }, - { - "address": leaf_ip, - "assigned_object_type": "dcim.interface", - "assigned_object_id": leaf_intf.id, - "description": f"{leaf_name} to spine1", - }, - ) - log_result("IP", "IP Address", leaf_ip, created) - - # P2P addresses for Spine2-Leaf links - for leaf_name, (spine_ip, leaf_ip) in SPINE2_P2P.items(): - spine_intf = interfaces["spine2"][f"Ethernet{list(SPINE2_P2P.keys()).index(leaf_name) + 1}"] - ip, created = get_or_create( - nb.ipam.ip_addresses, - { - "address": spine_ip, - "assigned_object_id": spine_intf.id, - }, - { - "address": spine_ip, - "assigned_object_type": "dcim.interface", - "assigned_object_id": spine_intf.id, - "description": f"spine2 to {leaf_name}", - }, - ) - log_result("IP", "IP Address", spine_ip, created) - - leaf_intf = interfaces[leaf_name]["Ethernet12"] - ip, created = get_or_create( - nb.ipam.ip_addresses, - { - "address": leaf_ip, - "assigned_object_id": leaf_intf.id, - }, - { - "address": leaf_ip, - "assigned_object_type": "dcim.interface", - "assigned_object_id": leaf_intf.id, - "description": f"{leaf_name} to spine2", - }, - ) - log_result("IP", "IP Address", leaf_ip, created) - - -def create_cables(nb: pynetbox.api, interfaces: dict): - """Create cables between devices.""" - print("\n=== Creating Cables ===") - - # Spine-Leaf cables - for spine, spine_intf, leaf, leaf_intf in SPINE_LEAF_CABLES: - a_intf = interfaces.get(spine, {}).get(spine_intf) - b_intf = interfaces.get(leaf, {}).get(leaf_intf) - - if not a_intf or not b_intf: - print( - f" [Skip] Cable: {spine}:{spine_intf} <-> {leaf}:{leaf_intf} (interface not found)" - ) - continue - - # Check if cable already exists - existing = nb.dcim.cables.filter(termination_a_id=a_intf.id) - if list(existing): - print(f" [Exists] Cable: {spine}:{spine_intf} <-> {leaf}:{leaf_intf}") - continue - - try: - cable = nb.dcim.cables.create( - { - "a_terminations": [{"object_type": "dcim.interface", "object_id": a_intf.id}], - "b_terminations": [{"object_type": "dcim.interface", "object_id": b_intf.id}], - "status": "connected", - "type": "cat6a", - } - ) - print(f" [Created] Cable: {spine}:{spine_intf} <-> {leaf}:{leaf_intf}") - except Exception as e: - print(f" [Error] Cable: {spine}:{spine_intf} <-> {leaf}:{leaf_intf}: {e}") - - # MLAG peer-link cables - for leaf_a, intf_a, leaf_b, intf_b in MLAG_PEER_CABLES: - a_intf = interfaces.get(leaf_a, {}).get(intf_a) - b_intf = interfaces.get(leaf_b, {}).get(intf_b) - - if not a_intf or not b_intf: - print(f" [Skip] Cable: {leaf_a}:{intf_a} <-> {leaf_b}:{intf_b} (interface not found)") - continue - - existing = nb.dcim.cables.filter(termination_a_id=a_intf.id) - if list(existing): - print(f" [Exists] Cable: {leaf_a}:{intf_a} <-> {leaf_b}:{intf_b}") - continue - - try: - cable = nb.dcim.cables.create( - { - "a_terminations": [{"object_type": "dcim.interface", "object_id": a_intf.id}], - "b_terminations": [{"object_type": "dcim.interface", "object_id": b_intf.id}], - "status": "connected", - "type": "cat6a", - "label": "MLAG Peer-Link", - } - ) - print(f" [Created] Cable: {leaf_a}:{intf_a} <-> {leaf_b}:{intf_b} (MLAG peer-link)") - except Exception as e: - print(f" [Error] Cable: {leaf_a}:{intf_a} <-> {leaf_b}:{intf_b}: {e}") - - # Host dual-homing cables - for leaf_a, intf_a, leaf_b, intf_b, host in HOST_CABLES: - # Cable from leaf_a to host eth1 - a_intf = interfaces.get(leaf_a, {}).get(intf_a) - host_eth1 = interfaces.get(host, {}).get("eth1") - - if a_intf and host_eth1: - existing = nb.dcim.cables.filter(termination_a_id=a_intf.id) - if not list(existing): - try: - cable = nb.dcim.cables.create( - { - "a_terminations": [ - {"object_type": "dcim.interface", "object_id": a_intf.id} - ], - "b_terminations": [ - {"object_type": "dcim.interface", "object_id": host_eth1.id} - ], - "status": "connected", - "type": "cat6a", - } - ) - print(f" [Created] Cable: {leaf_a}:{intf_a} <-> {host}:eth1") - except Exception as e: - print(f" [Error] Cable: {leaf_a}:{intf_a} <-> {host}:eth1: {e}") - else: - print(f" [Exists] Cable: {leaf_a}:{intf_a} <-> {host}:eth1") - - # Cable from leaf_b to host eth2 - b_intf = interfaces.get(leaf_b, {}).get(intf_b) - host_eth2 = interfaces.get(host, {}).get("eth2") - - if b_intf and host_eth2: - existing = nb.dcim.cables.filter(termination_a_id=b_intf.id) - if not list(existing): - try: - cable = nb.dcim.cables.create( - { - "a_terminations": [ - {"object_type": "dcim.interface", "object_id": b_intf.id} - ], - "b_terminations": [ - {"object_type": "dcim.interface", "object_id": host_eth2.id} - ], - "status": "connected", - "type": "cat6a", - } - ) - print(f" [Created] Cable: {leaf_b}:{intf_b} <-> {host}:eth2") - except Exception as e: - print(f" [Error] Cable: {leaf_b}:{intf_b} <-> {host}:eth2: {e}") - else: - print(f" [Exists] Cable: {leaf_b}:{intf_b} <-> {host}:eth2") - - -# ============================================================================= -# Main -# ============================================================================= - - -def main(): - """Main provisioning function.""" - if not NETBOX_TOKEN: - print("Error: NETBOX_TOKEN environment variable is required") - sys.exit(1) - - print(f"Connecting to NetBox at {NETBOX_URL}...") - nb = pynetbox.api(NETBOX_URL, token=NETBOX_TOKEN) - - try: - # Verify connection - status = nb.status() - print(f"Connected to NetBox {status.get('netbox-version', 'unknown')}") - except Exception as e: - print(f"Error connecting to NetBox: {e}") - sys.exit(1) - - # Run provisioning steps - create_custom_fields(nb) - org = create_organization(nb) - vlans = create_vlans(nb, org["site"]) - vrfs = create_vrfs(nb) - create_prefixes(nb, vrfs) - devices = create_devices(nb, org) - interfaces = create_interfaces(nb, devices) - create_ip_addresses(nb, devices, interfaces, vrfs) - create_cables(nb, interfaces) - - print("\n" + "=" * 50) - print("Provisioning complete!") - print("=" * 50) - - -if __name__ == "__main__": - main() diff --git a/src/netbox/__init__.py b/src/netbox/__init__.py deleted file mode 100644 index 5b9aae4..0000000 --- a/src/netbox/__init__.py +++ /dev/null @@ -1,36 +0,0 @@ -""" -NetBox Client Module for Fabric Orchestrator. - -This module provides a NetBox API client for fetching fabric intent data -using pynetbox. - -Usage: - from src.netbox import FabricNetBoxClient - - client = FabricNetBoxClient( - url="http://netbox.local", - token="your-token" - ) - - # Get all leaf devices - leaves = client.get_leaf_devices() - - # Get complete intent for a device - intent = client.get_device_intent("leaf1") -""" - -from .client import ( - FabricNetBoxClient, - NetBoxAPIError, - NetBoxConnectionError, - NetBoxError, - NetBoxNotFoundError, -) - -__all__ = [ - "FabricNetBoxClient", - "NetBoxError", - "NetBoxConnectionError", - "NetBoxNotFoundError", - "NetBoxAPIError", -] diff --git a/src/netbox/client.py b/src/netbox/client.py deleted file mode 100644 index 27f1d83..0000000 --- a/src/netbox/client.py +++ /dev/null @@ -1,855 +0,0 @@ -""" -NetBox API Client for Fabric Orchestrator. - -This module provides a client for fetching fabric intent data from NetBox -using pynetbox. It retrieves device, interface, BGP, EVPN, and VRF configuration -to determine the desired network state. - -Usage: - from src.netbox import FabricNetBoxClient - - client = FabricNetBoxClient( - url="http://netbox.local", - token="your-token" - ) - - # Get all leaf devices - leaves = client.get_leaf_devices() - - # Get complete intent for a device - intent = client.get_device_intent("leaf1") -""" - -from __future__ import annotations - -import logging -import os -from typing import Any - -import pynetbox -from pynetbox.core.query import RequestError - -# ============================================================================= -# Exceptions -# ============================================================================= - - -class NetBoxError(Exception): - """Base exception for NetBox operations.""" - - pass - - -class NetBoxConnectionError(NetBoxError): - """Raised when connection to NetBox fails.""" - - pass - - -class NetBoxNotFoundError(NetBoxError): - """Raised when requested resource is not found.""" - - pass - - -class NetBoxAPIError(NetBoxError): - """Raised when NetBox API returns an error.""" - - pass - - -# ============================================================================= -# Logger -# ============================================================================= - -logger = logging.getLogger(__name__) - - -# ============================================================================= -# NetBox Client -# ============================================================================= - - -class FabricNetBoxClient: - """ - NetBox API client for fabric intent retrieval. - - Provides methods to fetch device, interface, BGP, EVPN, and VRF - configuration from NetBox to determine desired network state. - - Supports configuration via environment variables: - - NETBOX_URL: NetBox server URL - - NETBOX_TOKEN: API token for authentication - - Example: - client = FabricNetBoxClient( - url="http://netbox.local", - token="your-token" - ) - leaves = client.get_leaf_devices() - """ - - def __init__( - self, - url: str | None = None, - token: str | None = None, - threading: bool = False, - ): - """ - Initialize NetBox client. - - Args: - url: NetBox server URL (or NETBOX_URL env var) - token: API token (or NETBOX_TOKEN env var) - threading: Enable threading for requests (default: False) - - Raises: - NetBoxConnectionError: If URL or token is not provided - """ - self.url = url or os.environ.get("NETBOX_URL") - self.token = token or os.environ.get("NETBOX_TOKEN") - - if not self.url: - raise NetBoxConnectionError( - "NetBox URL not provided. Set NETBOX_URL environment variable or pass url parameter." - ) - if not self.token: - raise NetBoxConnectionError( - "NetBox token not provided. Set NETBOX_TOKEN environment variable or pass token parameter." - ) - - logger.debug(f"Connecting to NetBox at {self.url}") - - try: - self._api = pynetbox.api(self.url, token=self.token, threading=threading) - # Test connection by fetching API status - self._api.status() - logger.info(f"Successfully connected to NetBox at {self.url}") - except Exception as e: - raise NetBoxConnectionError(f"Failed to connect to NetBox at {self.url}: {e}") from e - - # ========================================================================= - # Device Methods - # ========================================================================= - - def get_device(self, name: str) -> dict[str, Any]: - """ - Get a device by name. - - Args: - name: Device name - - Returns: - Device data as dictionary - - Raises: - NetBoxNotFoundError: If device not found - """ - logger.debug(f"Fetching device: {name}") - try: - device = self._api.dcim.devices.get(name=name) - if not device: - raise NetBoxNotFoundError(f"Device not found: {name}") - return dict(device) - except RequestError as e: - raise NetBoxAPIError(f"Failed to fetch device {name}: {e}") from e - - def get_devices_by_role(self, role: str) -> list[dict[str, Any]]: - """ - Get all devices with a specific role. - - Args: - role: Device role slug (e.g., 'leaf', 'spine') - - Returns: - List of device dictionaries - """ - logger.debug(f"Fetching devices with role: {role}") - try: - devices = self._api.dcim.devices.filter(role=role) - return [dict(d) for d in devices] - except RequestError as e: - raise NetBoxAPIError(f"Failed to fetch devices with role {role}: {e}") from e - - def get_leaf_devices(self) -> list[dict[str, Any]]: - """ - Get all leaf devices. - - Returns: - List of leaf device dictionaries - """ - return self.get_devices_by_role("leaf") - - def get_spine_devices(self) -> list[dict[str, Any]]: - """ - Get all spine devices. - - Returns: - List of spine device dictionaries - """ - return self.get_devices_by_role("spine") - - def get_device_asn(self, device_name: str) -> int | None: - """ - Get ASN assigned to a device from custom field. - - Note: NetBox 4.4 does not support filtering ASNs by device. - ASN is stored as a custom field on the Device object. - - Args: - device_name: Device name - - Returns: - ASN number or None if not assigned - """ - logger.debug(f"Fetching ASN for device: {device_name}") - device = self.get_device(device_name) - return device.get("custom_fields", {}).get("asn") - - # ========================================================================= - # Interface Methods - # ========================================================================= - - def get_device_interfaces( - self, device_name: str, interface_type: str | None = None - ) -> list[dict[str, Any]]: - """ - Get interfaces for a device. - - Args: - device_name: Device name - interface_type: Optional interface type filter (e.g., 'virtual', 'lag', '1000base-t') - - Returns: - List of interface dictionaries - """ - logger.debug(f"Fetching interfaces for device: {device_name}, type: {interface_type}") - try: - filters = {"device": device_name} - if interface_type: - filters["type"] = interface_type - interfaces = self._api.dcim.interfaces.filter(**filters) - return [dict(i) for i in interfaces] - except RequestError as e: - raise NetBoxAPIError(f"Failed to fetch interfaces for {device_name}: {e}") from e - - def get_device_loopbacks(self, device_name: str) -> list[dict[str, Any]]: - """ - Get loopback interfaces for a device. - - Args: - device_name: Device name - - Returns: - List of loopback interface dictionaries - """ - logger.debug(f"Fetching loopbacks for device: {device_name}") - try: - interfaces = self._api.dcim.interfaces.filter(device=device_name, type="virtual") - loopbacks = [dict(i) for i in interfaces if i.name.lower().startswith("loopback")] - return loopbacks - except RequestError as e: - raise NetBoxAPIError(f"Failed to fetch loopbacks for {device_name}: {e}") from e - - def get_device_lags(self, device_name: str) -> list[dict[str, Any]]: - """ - Get LAG/Port-Channel interfaces for a device. - - Args: - device_name: Device name - - Returns: - List of LAG interface dictionaries - """ - logger.debug(f"Fetching LAGs for device: {device_name}") - return self.get_device_interfaces(device_name, interface_type="lag") - - def get_device_svis(self, device_name: str) -> list[dict[str, Any]]: - """ - Get SVI (VLAN) interfaces for a device. - - Args: - device_name: Device name - - Returns: - List of SVI interface dictionaries - """ - logger.debug(f"Fetching SVIs for device: {device_name}") - try: - interfaces = self._api.dcim.interfaces.filter(device=device_name, type="virtual") - svis = [dict(i) for i in interfaces if i.name.lower().startswith("vlan")] - return svis - except RequestError as e: - raise NetBoxAPIError(f"Failed to fetch SVIs for {device_name}: {e}") from e - - # ========================================================================= - # IP Address Methods - # ========================================================================= - - def get_interface_ips( - self, device_name: str, interface_name: str - ) -> list[dict[str, Any]]: - """ - Get IP addresses assigned to an interface. - - Args: - device_name: Device name - interface_name: Interface name - - Returns: - List of IP address dictionaries - """ - logger.debug(f"Fetching IPs for {device_name}:{interface_name}") - try: - ips = self._api.ipam.ip_addresses.filter( - device=device_name, interface=interface_name - ) - return [dict(ip) for ip in ips] - except RequestError as e: - raise NetBoxAPIError( - f"Failed to fetch IPs for {device_name}:{interface_name}: {e}" - ) from e - - def get_device_ips(self, device_name: str) -> list[dict[str, Any]]: - """ - Get all IP addresses for a device. - - Args: - device_name: Device name - - Returns: - List of IP address dictionaries with interface info - """ - logger.debug(f"Fetching all IPs for device: {device_name}") - try: - ips = self._api.ipam.ip_addresses.filter(device=device_name) - return [dict(ip) for ip in ips] - except RequestError as e: - raise NetBoxAPIError(f"Failed to fetch IPs for {device_name}: {e}") from e - - # ========================================================================= - # BGP Methods (Plugin) - # ========================================================================= - - def get_bgp_sessions(self, device_name: str | None = None) -> list[dict[str, Any]]: - """ - Get BGP sessions from NetBox BGP plugin. - - Args: - device_name: Optional device name filter - - Returns: - List of BGP session dictionaries - """ - logger.debug(f"Fetching BGP sessions for device: {device_name or 'all'}") - try: - if device_name: - sessions = self._api.plugins.bgp.sessions.filter(device=device_name) - else: - sessions = self._api.plugins.bgp.sessions.all() - return [dict(s) for s in sessions] - except RequestError as e: - raise NetBoxAPIError(f"Failed to fetch BGP sessions: {e}") from e - except AttributeError: - logger.warning("NetBox BGP plugin not available") - return [] - - def get_bgp_peer_groups(self, device_name: str | None = None) -> list[dict[str, Any]]: - """ - Get BGP peer groups from NetBox BGP plugin. - - Args: - device_name: Optional device name filter - - Returns: - List of BGP peer group dictionaries - """ - logger.debug(f"Fetching BGP peer groups for device: {device_name or 'all'}") - try: - if device_name: - peer_groups = self._api.plugins.bgp.peer_groups.filter(device=device_name) - else: - peer_groups = self._api.plugins.bgp.peer_groups.all() - return [dict(pg) for pg in peer_groups] - except RequestError as e: - raise NetBoxAPIError(f"Failed to fetch BGP peer groups: {e}") from e - except AttributeError: - logger.warning("NetBox BGP plugin not available") - return [] - - # ========================================================================= - # VLAN Methods - # ========================================================================= - - def get_vlans(self, site: str | None = None) -> list[dict[str, Any]]: - """ - Get VLANs. - - Args: - site: Optional site filter - - Returns: - List of VLAN dictionaries - """ - logger.debug(f"Fetching VLANs for site: {site or 'all'}") - try: - if site: - vlans = self._api.ipam.vlans.filter(site=site) - else: - vlans = self._api.ipam.vlans.all() - return [dict(v) for v in vlans] - except RequestError as e: - raise NetBoxAPIError(f"Failed to fetch VLANs: {e}") from e - - def get_vlan(self, vlan_id: int, site: str | None = None) -> dict[str, Any] | None: - """ - Get a specific VLAN by ID. - - Args: - vlan_id: VLAN ID - site: Optional site filter - - Returns: - VLAN dictionary or None if not found - """ - logger.debug(f"Fetching VLAN {vlan_id}") - try: - filters = {"vid": vlan_id} - if site: - filters["site"] = site - vlans = self._api.ipam.vlans.filter(**filters) - vlan_list = list(vlans) - if vlan_list: - return dict(vlan_list[0]) - return None - except RequestError as e: - raise NetBoxAPIError(f"Failed to fetch VLAN {vlan_id}: {e}") from e - - # ========================================================================= - # L2VPN/EVPN Methods - # ========================================================================= - - def get_l2vpns(self, l2vpn_type: str = "evpn") -> list[dict[str, Any]]: - """ - Get L2VPNs (EVPN instances). - - Args: - l2vpn_type: L2VPN type filter (default: 'evpn') - - Returns: - List of L2VPN dictionaries with VNI identifiers - """ - logger.debug(f"Fetching L2VPNs of type: {l2vpn_type}") - try: - l2vpns = self._api.vpn.l2vpns.filter(type=l2vpn_type) - return [dict(l2vpn) for l2vpn in l2vpns] - except RequestError as e: - raise NetBoxAPIError(f"Failed to fetch L2VPNs: {e}") from e - - def get_l2vpn_terminations( - self, device_name: str | None = None, l2vpn_name: str | None = None - ) -> list[dict[str, Any]]: - """ - Get L2VPN terminations (VLAN-to-device mappings). - - Note: NetBox 4.4 does not support filtering L2VPN terminations by device. - L2VPN terminations link to VLANs, not devices directly. - The device_name parameter is accepted for API compatibility but ignored. - - Args: - device_name: Ignored - not supported by NetBox API - l2vpn_name: Optional L2VPN name filter - - Returns: - List of L2VPN termination dictionaries - """ - if device_name: - logger.warning( - f"device_name filter '{device_name}' ignored - " - "L2VPN terminations cannot be filtered by device in NetBox 4.4" - ) - logger.debug(f"Fetching L2VPN terminations for l2vpn: {l2vpn_name or 'all'}") - try: - filters = {} - if l2vpn_name: - filters["l2vpn"] = l2vpn_name - - if filters: - terminations = self._api.vpn.l2vpn_terminations.filter(**filters) - else: - terminations = self._api.vpn.l2vpn_terminations.all() - return [dict(t) for t in terminations] - except RequestError as e: - raise NetBoxAPIError(f"Failed to fetch L2VPN terminations: {e}") from e - - def get_vni_for_vlan(self, vlan_id: int) -> int | None: - """ - Get VNI for a VLAN from L2VPN configuration. - - Args: - vlan_id: VLAN ID - - Returns: - VNI number or None if not mapped - """ - logger.debug(f"Fetching VNI for VLAN {vlan_id}") - try: - # Get L2VPN terminations for this VLAN - terminations = self._api.vpn.l2vpn_terminations.filter(vlan_id=vlan_id) - term_list = list(terminations) - if term_list: - l2vpn = term_list[0].l2vpn - if l2vpn and hasattr(l2vpn, "identifier"): - return l2vpn.identifier - return None - except RequestError as e: - raise NetBoxAPIError(f"Failed to fetch VNI for VLAN {vlan_id}: {e}") from e - - # ========================================================================= - # VRF Methods - # ========================================================================= - - def get_vrfs(self) -> list[dict[str, Any]]: - """ - Get all VRFs with route targets. - - Returns: - List of VRF dictionaries including import/export route targets - """ - logger.debug("Fetching all VRFs") - try: - vrfs = self._api.ipam.vrfs.all() - return [dict(v) for v in vrfs] - except RequestError as e: - raise NetBoxAPIError(f"Failed to fetch VRFs: {e}") from e - - def get_vrf(self, name: str) -> dict[str, Any] | None: - """ - Get a VRF by name. - - Args: - name: VRF name - - Returns: - VRF dictionary or None if not found - """ - logger.debug(f"Fetching VRF: {name}") - try: - vrf = self._api.ipam.vrfs.get(name=name) - if vrf: - return dict(vrf) - return None - except RequestError as e: - raise NetBoxAPIError(f"Failed to fetch VRF {name}: {e}") from e - - def get_device_vrf_interfaces( - self, device_name: str, vrf_name: str - ) -> list[dict[str, Any]]: - """ - Get interfaces assigned to a VRF on a device. - - Note: NetBox 4.4 does not support filtering interfaces by VRF directly. - VRF is assigned to IP addresses, not interfaces. This method queries - IP addresses with VRF filter and extracts the associated interfaces. - - Args: - device_name: Device name - vrf_name: VRF name - - Returns: - List of interface dictionaries assigned to the VRF - """ - logger.debug(f"Fetching VRF {vrf_name} interfaces for device: {device_name}") - try: - # Get IPs in this VRF for this device - ips = self._api.ipam.ip_addresses.filter(device=device_name, vrf=vrf_name) - - # Extract unique interface IDs - interface_ids = set() - for ip in ips: - if ip.assigned_object_type == "dcim.interface" and ip.assigned_object_id: - interface_ids.add(ip.assigned_object_id) - - # Fetch interfaces - if not interface_ids: - return [] - - interfaces = [] - for iid in interface_ids: - iface = self._api.dcim.interfaces.get(id=iid) - if iface: - interfaces.append(dict(iface)) - return interfaces - except RequestError as e: - raise NetBoxAPIError( - f"Failed to fetch VRF interfaces for {device_name}/{vrf_name}: {e}" - ) from e - - def get_route_targets(self) -> list[dict[str, Any]]: - """ - Get all route targets. - - Returns: - List of route target dictionaries - """ - logger.debug("Fetching all route targets") - try: - rts = self._api.ipam.route_targets.all() - return [dict(rt) for rt in rts] - except RequestError as e: - raise NetBoxAPIError(f"Failed to fetch route targets: {e}") from e - - # ========================================================================= - # MLAG Methods (Custom Fields) - # ========================================================================= - - def get_device_mlag_config(self, device_name: str) -> dict[str, Any]: - """ - Get MLAG configuration from device custom fields. - - Custom fields expected: - - mlag_domain_id: MLAG domain identifier - - mlag_peer_address: MLAG peer IP address - - mlag_local_address: MLAG local IP address - - mlag_virtual_mac: Shared virtual-router MAC - - Args: - device_name: Device name - - Returns: - MLAG configuration dictionary - """ - logger.debug(f"Fetching MLAG config for device: {device_name}") - device = self.get_device(device_name) - custom_fields = device.get("custom_fields", {}) or {} - - return { - "domain_id": custom_fields.get("mlag_domain_id"), - "peer_address": custom_fields.get("mlag_peer_address"), - "local_address": custom_fields.get("mlag_local_address"), - "virtual_mac": custom_fields.get("mlag_virtual_mac"), - } - - def get_mlag_peer_link(self, device_name: str) -> dict[str, Any] | None: - """ - Get MLAG peer-link interface for a device. - - Looks for interface with mlag_peer_link custom field set to True. - - Args: - device_name: Device name - - Returns: - Peer-link interface dictionary or None - """ - logger.debug(f"Fetching MLAG peer-link for device: {device_name}") - try: - interfaces = self._api.dcim.interfaces.filter(device=device_name) - for iface in interfaces: - cf = iface.custom_fields or {} - if cf.get("mlag_peer_link"): - return dict(iface) - return None - except RequestError as e: - raise NetBoxAPIError( - f"Failed to fetch MLAG peer-link for {device_name}: {e}" - ) from e - - def get_interface_mlag_id( - self, device_name: str, interface_name: str - ) -> int | None: - """ - Get MLAG ID from interface custom field. - - Args: - device_name: Device name - interface_name: Interface name - - Returns: - MLAG ID or None if not assigned - """ - logger.debug(f"Fetching MLAG ID for {device_name}:{interface_name}") - try: - iface = self._api.dcim.interfaces.get(device=device_name, name=interface_name) - if iface: - cf = iface.custom_fields or {} - return cf.get("mlag_id") - return None - except RequestError as e: - raise NetBoxAPIError( - f"Failed to fetch MLAG ID for {device_name}:{interface_name}: {e}" - ) from e - - # ========================================================================= - # VRF Custom Field Methods - # ========================================================================= - - def get_vrf_l3vni(self, vrf_name: str) -> int | None: - """ - Get L3 VNI from VRF custom field. - - Args: - vrf_name: VRF name - - Returns: - L3 VNI number or None if not assigned - """ - logger.debug(f"Fetching L3 VNI for VRF: {vrf_name}") - vrf = self.get_vrf(vrf_name) - if vrf: - return vrf.get("custom_fields", {}).get("l3vni") - return None - - # ========================================================================= - # Virtual IP Methods (Custom Fields) - # ========================================================================= - - def get_virtual_ips(self, device_name: str | None = None) -> list[dict[str, Any]]: - """ - Get IP addresses marked as virtual/anycast IPs. - - Retrieves IPs where the custom field `virtual_ip` is True. - These are typically anycast IPs shared across an MLAG pair. - - Args: - device_name: Optional device name filter - - Returns: - List of virtual IP address dictionaries - """ - logger.debug(f"Fetching virtual IPs for device: {device_name or 'all'}") - try: - if device_name: - ips = self._api.ipam.ip_addresses.filter(device=device_name) - else: - ips = self._api.ipam.ip_addresses.all() - - virtual_ips = [] - for ip in ips: - cf = ip.custom_fields or {} - if cf.get("virtual_ip"): - virtual_ips.append(dict(ip)) - return virtual_ips - except RequestError as e: - raise NetBoxAPIError(f"Failed to fetch virtual IPs: {e}") from e - - # ========================================================================= - # Aggregated Intent Methods - # ========================================================================= - - def get_device_intent(self, device_name: str) -> dict[str, Any]: - """ - Get complete fabric intent for a single device. - - Aggregates all relevant configuration data for a device including: - - Device info and custom fields - - Interfaces (physical, loopback, LAG, SVI) - - IP addresses - - BGP sessions and peer groups - - VLAN assignments - - L2VPN/EVPN terminations - - VRF assignments - - MLAG configuration - - Args: - device_name: Device name - - Returns: - Complete device intent dictionary - """ - logger.info(f"Building complete intent for device: {device_name}") - - # Get device info - device = self.get_device(device_name) - - # Get interfaces - all_interfaces = self.get_device_interfaces(device_name) - loopbacks = self.get_device_loopbacks(device_name) - lags = self.get_device_lags(device_name) - svis = self.get_device_svis(device_name) - - # Get IP addresses - ip_addresses = self.get_device_ips(device_name) - - # Get ASN - asn = self.get_device_asn(device_name) - - # Get BGP configuration - bgp_sessions = self.get_bgp_sessions(device_name) - bgp_peer_groups = self.get_bgp_peer_groups(device_name) - - # Get L2VPN terminations for this device - l2vpn_terminations = self.get_l2vpn_terminations(device_name=device_name) - - # Get MLAG configuration - mlag_config = self.get_device_mlag_config(device_name) - mlag_peer_link = self.get_mlag_peer_link(device_name) - - # Build intent dictionary - intent = { - "device": { - "name": device.get("name"), - "role": device.get("role", {}).get("slug") if device.get("role") else None, - "platform": ( - device.get("platform", {}).get("slug") if device.get("platform") else None - ), - "site": device.get("site", {}).get("slug") if device.get("site") else None, - "custom_fields": device.get("custom_fields", {}), - }, - "asn": asn, - "interfaces": { - "all": all_interfaces, - "loopbacks": loopbacks, - "lags": lags, - "svis": svis, - }, - "ip_addresses": ip_addresses, - "bgp": { - "sessions": bgp_sessions, - "peer_groups": bgp_peer_groups, - }, - "l2vpn_terminations": l2vpn_terminations, - "mlag": { - "config": mlag_config, - "peer_link": mlag_peer_link, - }, - } - - logger.debug(f"Built intent for {device_name} with {len(all_interfaces)} interfaces") - return intent - - def get_fabric_intent(self) -> dict[str, Any]: - """ - Get complete fabric intent for all devices. - - Returns: - Dictionary with fabric-wide intent including: - - All devices with their intent - - Fabric-wide VLANs - - Fabric-wide L2VPNs - - Fabric-wide VRFs - """ - logger.info("Building complete fabric intent") - - # Get all devices - spine_devices = self.get_spine_devices() - leaf_devices = self.get_leaf_devices() - - # Get fabric-wide configuration - vlans = self.get_vlans() - l2vpns = self.get_l2vpns() - vrfs = self.get_vrfs() - - # Build device intents - device_intents = {} - for device in spine_devices + leaf_devices: - name = device.get("name") - if name: - device_intents[name] = self.get_device_intent(name) - - return { - "devices": device_intents, - "vlans": vlans, - "l2vpns": l2vpns, - "vrfs": vrfs, - }