diff --git a/.gitignore b/.gitignore index 0734728..3a37d7a 100755 --- a/.gitignore +++ b/.gitignore @@ -1,12 +1,13 @@ .DS_Store documentation/assets/images/diagrams/.$VXLAN.drawio.bkp -clab-vxlan-evpn-l2/* +fabric_vxlan/* netbox/ .env .venv .python-version -containerlab/clab-vxlan-evpn-l2 -containerlab/.lab_vxlan.yml.bak +containerlab/clab-vxlan_fabric .vscode/settings.json vxlan_automation.egg-info __pycache__ +containerlab/.fabric_vxlan.yml.bak +containerlab/network_images/*.tar.xz diff --git a/containerlab/fabric_vxlan.yml b/containerlab/fabric_vxlan.yml index bf4f3f1..7d27c0f 100755 --- a/containerlab/fabric_vxlan.yml +++ b/containerlab/fabric_vxlan.yml @@ -1,47 +1,47 @@ -name: vxlan-evpn-l2 +name: vxlan_fabric mgmt: network: management topology: nodes: padc_sp1_00: kind: ceos - image: ceos:4.33.1F + image: ceos:4.33.2F mgmt-ipv4: 172.20.20.10 padc_sp2_00: kind: ceos - image: ceos:4.33.1F + image: ceos:4.33.2F mgmt-ipv4: 172.20.20.11 pa01_lf1_00: kind: ceos - image: ceos:4.33.1F + image: ceos:4.33.2F mgmt-ipv4: 172.20.20.100 pa02_lf2_00: kind: ceos - image: ceos:4.33.1F + image: ceos:4.33.2F mgmt-ipv4: 172.20.20.101 pa03_lf3_00: kind: ceos - image: ceos:4.33.1F + image: ceos:4.33.2F mgmt-ipv4: 172.20.20.102 pa04_lf4_00: kind: ceos - image: ceos:4.33.1F + image: ceos:4.33.2F mgmt-ipv4: 172.20.20.103 pa01_sw1_00: kind: ceos - image: ceos:4.33.1F + image: ceos:4.33.2F mgmt-ipv4: 172.20.20.110 pa02_sw1_00: kind: ceos - image: ceos:4.33.1F + image: ceos:4.33.2F mgmt-ipv4: 172.20.20.111 pa03_sw1_00: kind: ceos - image: ceos:4.33.1F + image: ceos:4.33.2F mgmt-ipv4: 172.20.20.112 pa04_sw1_00: kind: ceos - image: ceos:4.33.1F + image: ceos:4.33.2F mgmt-ipv4: 172.20.20.113 host1: kind: linux diff --git a/containerlab/network_images/network-images.md b/containerlab/network_images/network-images.md index b5ce608..e84b515 100644 --- a/containerlab/network_images/network-images.md +++ b/containerlab/network_images/network-images.md @@ -1,4 +1,4 @@ # Network images Arista cEOS image can be downlaoded at : arista.com -`cEOS64-lab-4.32.0.1F.tar.xz` +`cEOS64-lab-4.33.2F.tar.xz` diff --git a/documentation/INSTALLATION.md b/documentation/INSTALLATION.md index e5beec9..58b1533 100755 --- a/documentation/INSTALLATION.md +++ b/documentation/INSTALLATION.md @@ -2,11 +2,15 @@ ## Table of Contents -1. [Installing ContainerLab](#installing-containerlab) -2. [Installing Docker](#installing-docker) -3. [Images Installation](#images-installation) -4. [Install Netbox and plugins](#install-netbox-and-plugins) -5. [Sources](#sources) +- [Installation Guide](#installation-guide) + - [Table of Contents](#table-of-contents) + - [Installing ContainerLab](#installing-containerlab) + - [Installing Docker](#installing-docker) + - [Images installation](#images-installation) + - [Arista cEOS](#arista-ceos) + - [Nokia SR Linux](#nokia-sr-linux) + - [Install Netbox and plugins](#install-netbox-and-plugins) + - [Sources](#sources) ## Installing ContainerLab @@ -64,7 +68,7 @@ Once you created an account, please logged in and down the cEOS docker images. To add this new image to docker, please use the docker CLI command : ```bash -docker import cEOS64-lab-4.32.0.1F.tar.xz ceos:4.32.0.1F +docker import cEOS64-lab-4.33.2F.tar.xz ceos:4.33.2F ``` ### Nokia SR Linux @@ -85,8 +89,6 @@ ghcr.io/nokia/srlinux latest 801eb020ad70 11 days ago 2.59GB ## Install Netbox and plugins For this project, we need to install specific plugin : - - [Netbox BGP](https://github.com/netbox-community/netbox-bgp) - - [Netbox Diode](https://github.com/netboxlabs/diode) - [Netbox Topology Views](https://github.com/netbox-community/netbox-topology-views) ```bash @@ -95,8 +97,6 @@ ghcr.io/nokia/srlinux latest 801eb020ad70 11 days ago 2.59GB touch plugin_requirements.txt Dockerfile-Plugins docker-compose.override.yml cat < plugin_requirements.txt netbox_topology_views - netboxlabs-diode-netbox-plugin - netbox-napalm-plugin EOF ``` @@ -165,7 +165,7 @@ ghcr.io/nokia/srlinux latest 801eb020ad70 11 days ago 2.59GB ```python PLUGINS = [ - "netbox-topology-views" + "netbox_topology_views" ] ``` diff --git a/templates/leaves.j2 b/templates/leaves.j2 index b018eaf..5f014f6 100644 --- a/templates/leaves.j2 +++ b/templates/leaves.j2 @@ -1,44 +1,157 @@ -{# Interface Configuration #} -{% for interface in device.interfaces.all() %} +{# Leaf Configuration Template #} +{%- for interface in device.interfaces.all() %} interface {{ interface.name }} {%- if interface.description %} -description {{ interface.description }} + description {{ interface.description }} +{%- endif %} +{%- if interface.name == 'Loopback0' %} + description VTEP +{%- endif %} +{%- if interface.name == 'Ethernet3' %} + switchport mode trunk + no shutdown + mtu 9214 +{%- else %} + no shutdown + no switchport +{%- set ip_address = interface.ip_addresses.first() %} +{%- if ip_address %} + ip address {{ ip_address.address }} +{%- endif %} + mtu 9214 {%- endif %} -no shutdown -no switchport -ip address {{ ipam.IPAddress.objects.get(assigned_object_id=interface.id).address }} -mtu 9214 ! -{% endfor %} +{%- endfor %} -{# BGP Configuration #} -{% set loopback_interface = device.interfaces.get(name='Loopback0') %} -{% set router_id = ipam.IPAddress.objects.get(assigned_object_id=loopback_interface.id).address %} +service routing protocols model multi-agent +ip routing + +{# BGP Route-Maps and Prefix Lists #} +{%- set loopback_ip = device.interfaces.get(name='Loopback0').ip_addresses.first().address %} +ip prefix-list VTEP_PREFIX seq 10 permit {{ loopback_ip }} +! +route-map RMAP_VTEP permit 10 + match ip address prefix-list VTEP_PREFIX +! + +{# Complete BGP Configuration #} +{%- set router_id = loopback_ip.ip %} router bgp {{ device.custom_field_data.ASN }} -router-id {{ router_id }} -maximum-paths 4 ecmp 4 -neighbor SPINE_GROUP peer group -neighbor SPINE_GROUP allowas-in 1 -neighbor SPINE_GROUP ebgp-multihop 4 -neighbor SPINE_GROUP send-community extended -neighbor SPINE_GROUP maximum-routes 12000 + router-id {{ router_id }} + maximum-paths 4 ecmp 4 + ! + neighbor SPINE_GROUP peer group + neighbor SPINE_GROUP allowas-in 1 + neighbor SPINE_GROUP ebgp-multihop 4 + neighbor SPINE_GROUP send-community extended + neighbor SPINE_GROUP maximum-routes 12000 + ! + neighbor VTEP_GROUP peer group + neighbor VTEP_GROUP ebgp-multihop 5 + neighbor VTEP_GROUP send-community extended {%- for interface in device.interfaces.all() %} - {%- if interface.connected_endpoints and interface.name != 'Ethernet3' %} - {%- for remote_interface in interface.connected_endpoints %} - {%- set remote_ip = ipam.IPAddress.objects.get(assigned_object_id=remote_interface.id) %} -neighbor {{ remote_ip.address }} peer group SPINE_GROUP -neighbor {{ remote_ip.address }} remote-as {{ remote_interface.device.custom_field_data.ASN }} - {%- endfor %} - {%- endif %} +{%- if interface.connected_endpoints and interface.name != 'Ethernet3' and interface.name != 'Loopback0' %} +{%- set local_ip = interface.ip_addresses.first() %} +{%- if local_ip %} +{%- for remote_interface in interface.connected_endpoints %} +{%- set remote_ip = remote_interface.ip_addresses.first() %} +{%- if remote_ip %} + neighbor {{ remote_ip.address.ip }} peer group SPINE_GROUP + neighbor {{ remote_ip.address.ip }} remote-as {{ remote_interface.device.custom_field_data.ASN }} +{%- endif %} +{%- endfor %} +{%- endif %} +{%- endif %} +{%- endfor %} +{%- set other_leafs = device.site.devices.filter(role__slug='leaf').exclude(id=device.id) %} +{%- for leaf in other_leafs %} +{%- set leaf_lo = leaf.interfaces.get(name='Loopback0').ip_addresses.first() %} +{%- if leaf_lo %} + neighbor {{ leaf_lo.address.ip }} peer group VTEP_GROUP + neighbor {{ leaf_lo.address.ip }} remote-as {{ leaf.custom_field_data.ASN }} + neighbor {{ leaf_lo.address.ip }} update-source Loopback0 +{%- endif %} +{%- endfor %} + ! + address-family ipv4 +{%- for interface in device.interfaces.all() %} +{%- if interface.connected_endpoints and interface.name != 'Ethernet3' and interface.name != 'Loopback0' %} +{%- set local_ip = interface.ip_addresses.first() %} +{%- if local_ip %} +{%- for remote_interface in interface.connected_endpoints %} +{%- set remote_ip = remote_interface.ip_addresses.first() %} +{%- if remote_ip %} + neighbor {{ remote_ip.address.ip }} activate +{%- endif %} +{%- endfor %} +{%- endif %} +{%- endif %} +{%- endfor %} + redistribute connected route-map RMAP_VTEP + ! + address-family evpn +{%- for leaf in other_leafs %} +{%- set leaf_lo = leaf.interfaces.get(name='Loopback0').ip_addresses.first() %} +{%- if leaf_lo %} + neighbor {{ leaf_lo.address.ip }} activate +{%- endif %} {%- endfor %} ! -address-family ipv4 -{%- for interface in device.interfaces.all() %} - {%- if interface.connected_endpoints and interface.name != 'Ethernet3' %} - {%- for remote_interface in interface.connected_endpoints %} - {%- set remote_ip = ipam.IPAddress.objects.get(assigned_object_id=remote_interface.id) %} -neighbor {{ remote_ip.address }} activate - {%- endfor %} - {%- endif %} + +{# VXLAN Configuration #} +{%- if device.location and device.location.tenant %} +{%- set tenant = device.location.tenant %} +{%- set tenant_vlans = tenant.vlans.all() %} +{%- set tenant_prefix = tenant.prefixes.first() %} +{%- set tenant_l2vpns = tenant.l2vpns.filter(type='vxlan-evpn') %} +{%- set loopback_ip = device.interfaces.get(name='Loopback0').ip_addresses.first().address.ip %} +ip virtual-router mac-address 00:0a:bc:10:11:02 +! +{%- for vlan in tenant_vlans %} +vlan {{ vlan.vid }} + name IRB_{{ tenant.name|upper }}_SERVICE +! +interface Vlan{{ vlan.vid }} + description {{ tenant.name }} Service Interface + vrf {{ tenant.name|upper }} +{%- if tenant_prefix %} +{%- set network = tenant_prefix.prefix.network %} +{%- set first_ip = network + 1 %} + ip address virtual {{ first_ip }}/{{ tenant_prefix.prefix.prefixlen }} +{%- endif %} +! {%- endfor %} -! \ No newline at end of file + +{%- set spine_loopbacks = [] %} +{%- for spine in device.site.devices.filter(role__slug='spine') %} +{%- set spine_lo = spine.interfaces.get(name='Loopback0').ip_addresses.first() %} +{%- if spine_lo %} +{%- set _ = spine_loopbacks.append(spine_lo.address.ip) %} +{%- endif %} +{%- endfor %} + +{%- for l2vpn in tenant_l2vpns %} +{%- set vxlan_index = loop.index %} +interface Vxlan{{ vxlan_index }} + description VTI + vxlan source-interface Loopback0 +{%- for vlan in tenant_vlans %} + vxlan vlan {{ vlan.vid }} vni {{ l2vpn.identifier }} +{%- for spine_ip in spine_loopbacks %} + vxlan vlan {{ vlan.vid }} flood vtep {{ spine_ip }} +{%- endfor %} +{%- endfor %} +! +{%- endfor %} + +router bgp {{ device.custom_field_data.ASN }} +{%- for vlan in tenant_vlans %} +{%- set l2vpn = tenant_l2vpns.first() %} +{%- if l2vpn %} + vlan {{ vlan.vid }} + rd {{ loopback_ip }}:{{ l2vpn.identifier }} + route-target both {{ l2vpn.identifier }}:{{ vlan.vid }} + redistribute learned +{%- endif %} +{%- endfor %} +{%- endif %} \ No newline at end of file diff --git a/templates/spines.j2 b/templates/spines.j2 index 7ed437b..11662a9 100644 --- a/templates/spines.j2 +++ b/templates/spines.j2 @@ -1,44 +1,50 @@ -{# Interface Configuration #} -{% for interface in device.interfaces.all() %} +{%- for interface in device.interfaces.all() %} interface {{ interface.name }} {%- if interface.description %} -description {{ interface.description }} + description {{ interface.description }} {%- endif %} -no shutdown -no switchport -ip address {{ ipam.IPAddress.objects.get(assigned_object_id=interface.id).address }} -mtu 9214 + no shutdown + no switchport +{%- set ip_address = interface.ip_addresses.first() %} +{%- if ip_address %} + ip address {{ ip_address.address }} +{%- endif %} + mtu 9214 ! -{% endfor %} +{%- endfor %} {# BGP Configuration #} -{% set loopback_interface = device.interfaces.get(name='Loopback0') %} -{% set router_id = ipam.IPAddress.objects.get(assigned_object_id=loopback_interface.id).address %} +ip routing +{%- set loopback_interface = device.interfaces.get(name='Loopback0') %} +{%- set router_id = loopback_interface.ip_addresses.first().address.ip %} router bgp {{ device.custom_field_data.ASN }} -router-id {{ router_id }} -maximum-paths 4 ecmp 4 -neighbor LEAF_GROUP peer group -neighbor LEAF_GROUP allowas-in 1 -neighbor LEAF_GROUP ebgp-multihop 4 -neighbor LEAF_GROUP send-community extended -neighbor LEAF_GROUP maximum-routes 12000 + router-id {{ router_id }} + maximum-paths 4 ecmp 4 + neighbor LEAF_GROUP peer group + neighbor LEAF_GROUP allowas-in 1 + neighbor LEAF_GROUP ebgp-multihop 4 + neighbor LEAF_GROUP send-community extended + neighbor LEAF_GROUP maximum-routes 12000 {%- for interface in device.interfaces.all() %} - {%- if interface.connected_endpoints %} - {%- for remote_interface in interface.connected_endpoints %} - {%- set remote_ip = ipam.IPAddress.objects.get(assigned_object_id=remote_interface.id) %} -neighbor {{ remote_ip.address }} peer group LEAF_GROUP -neighbor {{ remote_ip.address }} remote-as {{ remote_interface.device.custom_field_data.ASN }} - {%- endfor %} - {%- endif %} +{%- if interface.connected_endpoints %} +{%- for remote_interface in interface.connected_endpoints %} +{%- set remote_ip = remote_interface.ip_addresses.first() %} +{%- if remote_ip %} + neighbor {{ remote_ip.address.ip }} peer group LEAF_GROUP + neighbor {{ remote_ip.address.ip }} remote-as {{ remote_interface.device.custom_field_data.ASN }} +{%- endif %} {%- endfor %} -! -address-family ipv4 +{%- endif %} +{%- endfor %} + address-family ipv4 {%- for interface in device.interfaces.all() %} - {%- if interface.connected_endpoints %} - {%- for remote_interface in interface.connected_endpoints %} - {%- set remote_ip = ipam.IPAddress.objects.get(assigned_object_id=remote_interface.id) %} -neighbor {{ remote_ip.address }} activate - {%- endfor %} - {%- endif %} +{%- if interface.connected_endpoints %} +{%- for remote_interface in interface.connected_endpoints %} +{%- set remote_ip = remote_interface.ip_addresses.first() %} +{%- if remote_ip %} + neighbor {{ remote_ip.address.ip }} activate +{%- endif %} +{%- endfor %} +{%- endif %} {%- endfor %} ! \ No newline at end of file diff --git a/utilities/Create_Fabric/add_customers.py b/utilities/Create_Fabric/add_customers.py index dfca09c..e79ec81 100644 --- a/utilities/Create_Fabric/add_customers.py +++ b/utilities/Create_Fabric/add_customers.py @@ -1,68 +1,250 @@ -from helpers.netbox_backend import NetBoxBackend +#!/usr/bin/env python3 +""" +add_customer.py +============== +Script pour ajouter un nouveau client dans NetBox avec : +- Création du tenant +- Attribution des locations +- Allocation d'un préfixe /24 +- Configuration VXLAN/VLAN +- Attribution des interfaces clients +""" + import sys +import logging +from typing import List, Optional +from dataclasses import dataclass -# Ask user for NetBox connection details -url = input("Enter NetBox URL: ") -token = input("Enter NetBox API Token: ") -nb_backend = NetBoxBackend(url, token) +from helpers.netbox_backend import NetBoxBackend -# Ask for customer details -customer_name = input("Enter Customer Name: ") -vlan_id = int(input("Enter VLAN ID: ")) -vni_id = int(input("Enter VNI ID: ")) +# Configuration du logging +logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') +logger = logging.getLogger(__name__) -# Get available locations -locations = list(nb_backend.nb.dcim.locations.all()) -for idx, loc in enumerate(locations): - print(f"{idx}: {loc.name}") -selected_indices = input("Select one or multiple locations by index (comma-separated): ") -selected_locations = [loc for i, loc in enumerate(locations) if str(i) in selected_indices.split(",")] +@dataclass +class CustomerConfig: + """Configuration du client""" + name: str + slug: str + vlan_id: int + vni_id: int + locations: List[dict] -# Create tenant -tenant = nb_backend.create_tenant(customer_name, customer_name.lower().replace(" ", "-")) +class CustomerProvisioner: + """Gère la provision d'un nouveau client dans NetBox""" -# Update locations to attach them to the tenant -for location in selected_locations: - try: - location.tenant = tenant.id - location.save() - except Exception as e: - print(f"[ERROR] Failed to update location {location.name} with tenant: {e}") + def __init__(self, netbox: NetBoxBackend): + self.netbox = netbox + self.config: Optional[CustomerConfig] = None + self.tenant: Optional[dict] = None + self.vlan: Optional[dict] = None + self.l2vpn: Optional[dict] = None -# Allocate /24 prefix for customer -role_id = nb_backend.nb.ipam.roles.get(slug="customerscontainer").id -parent_prefixes = list(nb_backend.nb.ipam.prefixes.filter(role_id=role_id)) -if not parent_prefixes: - print("[ERROR] No available parent prefix found.") - sys.exit(1) + def get_user_input(self) -> CustomerConfig: + """Récupère les informations du client depuis l'utilisateur""" + try: + # Informations de base + customer_name = input("Enter Customer Name: ").strip() + if not customer_name: + raise ValueError("Customer name is required") -customer_prefix = nb_backend.allocate_prefix(parent_prefixes[0], 24, None, None) -if not customer_prefix: - print("[ERROR] Could not allocate /24 for customer.") - sys.exit(1) + customer_slug = customer_name.lower().replace(" ", "-") + + # VLAN et VNI + vlan_id = int(input("Enter VLAN ID (1-4094): ").strip()) + if not 1 <= vlan_id <= 4094: + raise ValueError("VLAN ID must be between 1 and 4094") -# Create L2VPN -l2vpn_slug = f"{customer_name.lower().replace(' ', '-')}-vpn" -l2vpn = nb_backend.create_l2vpn(vni_id, f"{customer_name}_vpn", l2vpn_slug, tenant.id) + vni_id = int(input("Enter VNI ID: ").strip()) + if vni_id < 1: + raise ValueError("VNI ID must be positive") -# Create VLAN -vlan_slug = f"{customer_name.lower().replace(' ', '-')}-vlan" -vlan = nb_backend.create_vlan(vlan_id, f"{customer_name}_vlan", vlan_slug, tenant.id) + # Sélection des locations + locations = list(self.netbox.nb.dcim.locations.all()) + if not locations: + raise ValueError("No locations found in NetBox") -# Create VXLAN termination -vxlan_termination = nb_backend.create_vxlan_termination(l2vpn.id, "ipam.vlan", vlan.id) + print("\nAvailable Locations:") + for idx, loc in enumerate(locations): + print(f"{idx}: {loc.name}") -# Assign IP to leaf devices Ethernet3 -for location in selected_locations: - leaf_devices = nb_backend.nb.dcim.devices.filter(role="leaf", location_id=location.id) - if leaf_devices: - ip_list = nb_backend.get_available_ips_in_prefix(customer_prefix) - if len(ip_list) < len(leaf_devices): - print("[ERROR] Not enough IP addresses available in the allocated /24.") + indices = input("Select locations (comma-separated indices): ").strip() + selected_locations = [ + loc for i, loc in enumerate(locations) + if str(i) in indices.split(",") + ] + + if not selected_locations: + raise ValueError("At least one location must be selected") + + return CustomerConfig( + name=customer_name, + slug=customer_slug, + vlan_id=vlan_id, + vni_id=vni_id, + locations=selected_locations + ) + + except ValueError as e: + logger.error(f"Invalid input: {str(e)}") sys.exit(1) + + def create_tenant(self) -> None: + """Crée le tenant pour le client""" + self.tenant = self.netbox.create_tenant( + self.config.name, + self.config.slug + ) + if not self.tenant: + raise RuntimeError(f"Failed to create tenant for {self.config.name}") - for device, ip in zip(leaf_devices, ip_list): - interface = nb_backend.get_or_create_interface(device.id, "Ethernet3") - nb_backend.assign_ip_to_interface(interface, ip.address) - else: - print(f"[ERROR] No leaf devices found in location {location.name}.") + logger.info(f"Created tenant: {self.tenant.name}") + + def assign_locations(self) -> None: + """Assigne les locations au tenant""" + for location in self.config.locations: + try: + location.tenant = self.tenant.id + location.save() + logger.info(f"Assigned location {location.name} to tenant") + except Exception as e: + logger.error(f"Failed to update location {location.name}: {str(e)}") + + def allocate_prefix(self) -> None: + """Alloue un préfixe /24 pour le client""" + try: + role = self.netbox.nb.ipam.roles.get(slug="customerscontainer") + if not role: + raise ValueError("Customer container role not found") + + parent_prefixes = list(self.netbox.nb.ipam.prefixes.filter(role_id=role.id)) + if not parent_prefixes: + raise ValueError("No available parent prefix found") + + customer_prefix = self.netbox.allocate_prefix( + parent_prefixes[0], 24, None, None, self.tenant.id + ) + if not customer_prefix: + raise RuntimeError("Failed to allocate /24 prefix") + + logger.info(f"Allocated prefix: {customer_prefix.prefix}") + return customer_prefix + + except Exception as e: + logger.error(f"Prefix allocation failed: {str(e)}") + raise + + def setup_vxlan(self) -> None: + """Configure VXLAN et VLAN pour le client""" + try: + # Création du L2VPN + self.l2vpn = self.netbox.create_l2vpn( + self.config.vni_id, + f"{self.config.name}_vpn", + f"{self.config.slug}-vpn", + self.tenant.id + ) + if not self.l2vpn: + raise RuntimeError("Failed to create L2VPN") + + # Création du VLAN + self.vlan = self.netbox.create_vlan( + self.config.vlan_id, + f"{self.config.name}_vlan", + f"{self.config.slug}-vlan", + self.tenant.id + ) + if not self.vlan: + raise RuntimeError("Failed to create VLAN") + + # Création de la terminaison VXLAN + vxlan_termination = self.netbox.create_vxlan_termination( + self.l2vpn.id, + "ipam.vlan", + self.vlan.id + ) + if not vxlan_termination: + raise RuntimeError("Failed to create VXLAN termination") + + logger.info(f"Created VXLAN configuration for {self.config.name}") + + except Exception as e: + logger.error(f"VXLAN setup failed: {str(e)}") + raise + + def configure_interfaces(self) -> None: + """Configure les interfaces client sur les leafs""" + for location in self.config.locations: + leaf_devices = self.netbox.nb.dcim.devices.filter( + role="leaf", + location_id=location.id + ) + + if not leaf_devices: + logger.warning(f"No leaf devices found in location {location.name}") + continue + + for device in leaf_devices: + interface = self.netbox.get_or_create_interface(device.id, "Ethernet3") + if not interface: + logger.error(f"Failed to get/create interface for {device.name}") + continue + + try: + interface.custom_field_data = {'Customer': self.tenant.id} + interface.save() + logger.info(f"Configured interface on {device.name}") + except Exception as e: + logger.error(f"Failed to configure interface on {device.name}: {str(e)}") + + def provision(self) -> None: + """Processus principal de provision du client""" + try: + # 1. Récupération des informations + self.config = self.get_user_input() + + # 2. Création du tenant + self.create_tenant() + + # 3. Attribution des locations + self.assign_locations() + + # 4. Allocation du préfixe + self.allocate_prefix() + + # 5. Configuration VXLAN + self.setup_vxlan() + + # 6. Configuration des interfaces + self.configure_interfaces() + + logger.info(f"Successfully provisioned customer: {self.config.name}") + + except Exception as e: + logger.error(f"Customer provisioning failed: {str(e)}") + sys.exit(1) + +def main(): + """Point d'entrée principal""" + try: + # Connexion à NetBox + netbox_url = input("Enter NetBox URL: ").strip() + netbox_token = input("Enter NetBox API Token: ").strip() + + if not all([netbox_url, netbox_token]): + raise ValueError("NetBox URL and token are required") + + netbox = NetBoxBackend(netbox_url, netbox_token) + if not netbox.check_connection(): + raise ConnectionError("Failed to connect to NetBox") + + # Provision du client + provisioner = CustomerProvisioner(netbox) + provisioner.provision() + + except Exception as e: + logger.error(f"Error: {str(e)}") + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/utilities/Create_Fabric/config.py b/utilities/Create_Fabric/config.py new file mode 100644 index 0000000..e5f25c2 --- /dev/null +++ b/utilities/Create_Fabric/config.py @@ -0,0 +1,35 @@ +"""Configuration et constantes pour la création de fabric VXLAN""" + +from dataclasses import dataclass +from enum import Enum +from typing import Optional + +class InterfaceTypes(Enum): + """Types d'interfaces supportés""" + QSFPP = "40gbase-x-qsfpp" + SFP = "1000base-x-sfp" + SFP_PLUS = "10gbase-x-sfpp" + VIRTUAL = "virtual" + +class DeviceRoles(Enum): + """Rôles des équipements""" + SPINE = "spine" + LEAF = "leaf" + ACCESS = "access" + +class IPRoles(Enum): + """Rôles des préfixes IP""" + UNDERLAY = "underlaycontainer" + LOOPBACK = "loopbackcontainer" + +@dataclass +class FabricConfig: + """Configuration de la fabric VXLAN""" + site_code: str + num_buildings: int + spine_type: str + leaf_type: str + access_type: str + tenant_id: Optional[int] = None + base_spine_asn: int = 65001 + base_leaf_asn: int = 65101 \ No newline at end of file diff --git a/utilities/Create_Fabric/create_vxlan_fabric.py b/utilities/Create_Fabric/create_vxlan_fabric.py deleted file mode 100644 index 94578b3..0000000 --- a/utilities/Create_Fabric/create_vxlan_fabric.py +++ /dev/null @@ -1,343 +0,0 @@ -#!/usr/bin/env python3 -""" -create_vxlan_fabric.py (version avec NetBoxBackend) - -Ce script illustre comment créer une fabric VXLAN sur NetBox, -notamment : - - Création ou sélection d'un site. - - Création de spines, leaves, et access. - - Création du câblage. - - Allocation automatique des /31 pour les liaisons. - - Attribution d'un /32 loopback par device. - - Attribution automatique d'ASN (Custom Field "ASN"). - -Il utilise une classe d'abstraction "NetBoxBackend" (dans NetBox_backend.py) -pour simplifier et clarifier les interactions avec l'API. -""" - -import getpass -import sys - -from helpers.netbox_backend import NetBoxBackend - - -def main(): - print("=== VXLAN Fabric Creation Script (via NetBoxBackend) ===") - - # 1) NetBox details - netbox_url = input("NetBox URL (e.g. https://netbox.local): ").strip() - if not netbox_url: - print("ERROR: NetBox URL is required.") - sys.exit(1) - - netbox_token = getpass.getpass("NetBox API Token: ") - if not netbox_token: - print("ERROR: NetBox API token is required.") - sys.exit(1) - - # 2) Init the NetBox backend wrapper - try: - nb = NetBoxBackend(netbox_url, netbox_token, verify_ssl=True) - except Exception as exc: - print(f"ERROR: Failed to connect to NetBox: {exc}") - sys.exit(1) - - # 3) Choose or create Site - existing_sites = nb.get_sites() - if not existing_sites: - print("No sites found in NetBox.") - sys.exit(1) - - print("\nExisting Sites:") - for idx, s in enumerate(existing_sites, start=1): - print(f" {idx}. {s.name} (slug={s.slug})") - - choice = input("Choose a site by number, or type 'new' to create one: ").strip().lower() - if choice == "new": - site_name = input("New site name (e.g. 'Paris'): ").strip() - site_code_input = input("New site code (e.g. 'PA'): ").strip() - if not site_name or not site_code_input: - print("ERROR: Site name and code required.") - sys.exit(1) - try: - site = nb.create_site(site_name, site_code_input.lower()) - print(f"Created new site: {site.name} ({site.slug})") - except Exception as exc: - print(f"ERROR: Failed to create site: {exc}") - sys.exit(1) - - site_clean = site.name.strip() - site_code = site_clean[:2].upper() if len(site_clean) >= 2 else site_clean.upper() - else: - try: - site_index = int(choice) - site = existing_sites[site_index - 1] - site_clean = site.name.strip() - site_code = site_clean[:2].upper() if len(site_clean) >= 2 else site_clean.upper() - except (ValueError, IndexError): - print("ERROR: Invalid site selection.") - sys.exit(1) - - # 4) Number of buildings - while True: - try: - num_buildings = int(input("How many buildings? (1–5): ").strip()) - if 1 <= num_buildings <= 5: - break - else: - print("ERROR: Please choose between 1 and 5.") - except ValueError: - print("ERROR: Invalid input. Try again.") - - # 5) Device type slugs - print("\nEnter device type slugs (must exist in NetBox).") - spine_devtype_slug = input("Spine Device Type Slug: ").strip() - leaf_devtype_slug = input("Leaf Device Type Slug: ").strip() - access_devtype_slug = input("Access Switch Device Type Slug: ").strip() - - # 6) Roles - spine_role = nb.get_device_role("spine") - if not spine_role: - print("ERROR: No device role with slug='spine'.") - sys.exit(1) - - leaf_role = nb.get_device_role("leaf") - if not leaf_role: - print("ERROR: No device role with slug='leaf'.") - sys.exit(1) - - access_role = nb.get_device_role("access") - if not access_role: - print("ERROR: No device role with slug='access'.") - sys.exit(1) - - print(f"Using roles -> Spine={spine_role.id}, Leaf={leaf_role.id}, Access={access_role.id}") - - # 7) Create / Retrieve 2 Spines - spine_names = [f"{site_code.lower()}dc_sp1_00", f"{site_code.lower()}dc_sp2_00"] - spines = [] - - for name in spine_names: - try: - new_spine = nb.create_device( - name=name, - device_type_slug=spine_devtype_slug, - role_id=spine_role.id, - site_id=site.id - ) - print(f"Spine: {new_spine.name}") - spines.append(new_spine) - except Exception as exc: - print(f"ERROR creating spine '{name}': {exc}") - sys.exit(1) - - # 8) Create Leaves + Access per building - leaves = [] - access_switches = [] - - # Helper to create/find location - def get_or_create_location(site_obj, location_name: str): - existing_loc = nb.nb.dcim.locations.get(site_id=site_obj.id, name=location_name) - if existing_loc: - print(f"Location '{existing_loc.name}' already exists; reusing.") - return existing_loc - try: - loc = nb.nb.dcim.locations.create( - name=location_name, - slug=location_name.lower(), - site=site_obj.id - ) - print(f"Created Location '{loc.name}'") - return loc - except Exception as loc_exc: - print(f"ERROR creating location '{location_name}': {loc_exc}") - sys.exit(1) - - for b_num in range(1, num_buildings + 1): - building_code = f"{site_code}{b_num}" - location = get_or_create_location(site, building_code) - - # Leaf device - leaf_name = f"{site_code.lower()}{str(b_num).zfill(2)}_lf1_00" - try: - leaf_dev = nb.create_device( - name=leaf_name, - device_type_slug=leaf_devtype_slug, - role_id=leaf_role.id, - site_id=site.id, - location_id=location.id - ) - print(f"Leaf: {leaf_dev.name}") - except Exception as exc: - print(f"ERROR creating leaf '{leaf_name}': {exc}") - sys.exit(1) - leaves.append(leaf_dev) - - # Access Switch - sw_name = f"{site_code.lower()}{str(b_num).zfill(2)}_sw1_00" - try: - acc_dev = nb.create_device( - name=sw_name, - device_type_slug=access_devtype_slug, - role_id=access_role.id, - site_id=site.id, - location_id=location.id - ) - print(f"Access Switch: {acc_dev.name}") - except Exception as exc: - print(f"ERROR creating access switch '{sw_name}': {exc}") - sys.exit(1) - access_switches.append(acc_dev) - - # 9) Cabling - def create_leaf_spine_cables(leaf_dev, spine_dev, leaf_if_name, spine_if_name): - leaf_if = nb.get_or_create_interface(leaf_dev.id, leaf_if_name) - spine_if = nb.get_or_create_interface(spine_dev.id, spine_if_name) - nb.create_cable_if_not_exists(leaf_if, spine_if) - - for i, leaf_dev in enumerate(leaves, start=1): - # Leaf <-> Spine1 sur Ethernet1 - create_leaf_spine_cables(leaf_dev, spines[0], "Ethernet1", f"Ethernet{i}") - # Leaf <-> Spine2 sur Ethernet2 - create_leaf_spine_cables(leaf_dev, spines[1], "Ethernet2", f"Ethernet{i}") - - # Leaf <-> Access Switch sur Ethernet3 (leaf) / Ethernet1 (access) - leaf_eth3 = nb.get_or_create_interface(leaf_dev.id, "Ethernet3") - acc_dev = access_switches[i - 1] - acc_if = nb.get_or_create_interface(acc_dev.id, "Ethernet1") - nb.create_cable_if_not_exists(leaf_eth3, acc_if) - - # 10) IP Assignments (/31) + ASN custom field - # 10a) Récupérer le prefix underlay - underlay_role = nb.nb.ipam.roles.get(slug="underlaycontainer") - if not underlay_role: - print("ERROR: No IPAM role 'underlaycontainer' found.") - sys.exit(1) - - underlay_pfxs = nb.nb.ipam.prefixes.filter(role_id=underlay_role.id, scope_id=site.id) - underlay_list = list(underlay_pfxs) - if not underlay_list: - print("ERROR: No underlay prefix found for this site.") - sys.exit(1) - - parent_prefix = underlay_list[0] - print(f"Using parent prefix '{parent_prefix.prefix}' for /31 allocations.") - - # 10b) Assign ASNs (spines 65001, leaves 65101) - next_spine_asn = 65001 - next_leaf_asn = 65101 - - # Spines - for spine_dev in spines: - dev_obj = nb.nb.dcim.devices.get(spine_dev.id) - if not dev_obj: - print(f"ERROR: Could not re-fetch spine '{spine_dev.name}'") - sys.exit(1) - if "ASN" not in dev_obj.custom_fields: - print(f"[WARNING] Spine '{dev_obj.name}' has no custom field 'ASN'.") - else: - dev_obj.custom_fields["ASN"] = next_spine_asn - try: - dev_obj.save() - print(f"Assigned ASN={next_spine_asn} to spine '{dev_obj.name}'.") - except Exception as exc: - print(f"ERROR saving 'ASN' on {dev_obj.name}: {exc}") - next_spine_asn += 1 - - # Leaves - for leaf_dev in leaves: - dev_obj = nb.nb.dcim.devices.get(leaf_dev.id) - if not dev_obj: - print(f"ERROR: Could not re-fetch leaf '{leaf_dev.name}'") - sys.exit(1) - if "ASN" not in dev_obj.custom_fields: - print(f"[WARNING] Leaf '{dev_obj.name}' has no custom field 'ASN'.") - else: - dev_obj.custom_fields["ASN"] = next_leaf_asn - try: - dev_obj.save() - print(f"Assigned ASN={next_leaf_asn} to leaf '{dev_obj.name}'.") - except Exception as exc: - print(f"ERROR saving 'ASN' on {dev_obj.name}: {exc}") - next_leaf_asn += 1 - - # 10c) Allouer /31 pour chaque liaison Spine<->Leaf - for i, leaf_dev in enumerate(leaves, start=1): - # Leaf.Eth1 <-> Spine1.Eth{i} - leaf_eth1 = nb.nb.dcim.interfaces.get(device_id=leaf_dev.id, name="Ethernet1") - sp1_if = nb.nb.dcim.interfaces.get(device_id=spines[0].id, name=f"Ethernet{i}") - - child_31 = nb.allocate_prefix(parent_prefix, 31, site.id, underlay_role.id) - if not child_31: - print("ERROR: Could not allocate /31 for Spine1<->Leaf.") - sys.exit(1) - ip_list = nb.get_available_ips_in_prefix(child_31) - if len(ip_list) < 2: - print("ERROR: Not enough IP addresses in newly allocated /31.") - sys.exit(1) - - nb.assign_ip_to_interface(sp1_if, ip_list[0].address) - nb.assign_ip_to_interface(leaf_eth1, ip_list[1].address) - - # Leaf.Eth2 <-> Spine2.Eth{i} - leaf_eth2 = nb.nb.dcim.interfaces.get(device_id=leaf_dev.id, name="Ethernet2") - sp2_if = nb.nb.dcim.interfaces.get(device_id=spines[1].id, name=f"Ethernet{i}") - - child_31b = nb.allocate_prefix(parent_prefix, 31, site.id, underlay_role.id) - if not child_31b: - print("ERROR: No /31 returned for Spine2<->Leaf.") - sys.exit(1) - ip_list_b = nb.get_available_ips_in_prefix(child_31b) - if len(ip_list_b) < 2: - print("ERROR: Not enough IP addresses in newly allocated /31.") - sys.exit(1) - - nb.assign_ip_to_interface(sp2_if, ip_list_b[0].address) - nb.assign_ip_to_interface(leaf_eth2, ip_list_b[1].address) - - # 11) Loopback /32 assignment - loopback_role = nb.nb.ipam.roles.get(slug="loopbackcontainer") - if not loopback_role: - print("ERROR: No IPAM role 'loopbackcontainer' found.") - sys.exit(1) - - loopback_pfxs = nb.nb.ipam.prefixes.filter(role_id=loopback_role.id, scope_id=site.id) - loopback_list = list(loopback_pfxs) - if not loopback_list: - print("ERROR: No loopback prefix found for this site.") - sys.exit(1) - - loopback_parent = loopback_list[0] - print(f"Using parent prefix '{loopback_parent.prefix}' for /32 loopback allocations.") - - for dev in spines + leaves: - # Get or create Loopback0 - loop0_if = nb.get_or_create_interface(dev.id, "Loopback0", "virtual") - if not loop0_if: - print(f"ERROR: Could not create/retrieve Loopback0 for {dev.name}") - continue - - child_32 = nb.allocate_prefix(loopback_parent, 32, site.id, loopback_role.id) - if not child_32: - print(f"ERROR: Could not allocate /32 for {dev.name}.") - continue - - ip_list_c = nb.get_available_ips_in_prefix(child_32) - if not ip_list_c: - print(f"ERROR: Not enough IP addresses in newly allocated /32 for {dev.name}.") - continue - - new_lo_ip = nb.assign_ip_to_interface(loop0_if, ip_list_c[0].address) - if new_lo_ip: - print(f"Assigned {new_lo_ip.address} to {dev.name} Loopback0.") - - print("\n=== Fabric Creation Completed ===") - print(f"Site: {site.name} (slug={site.slug})") - print("Spines:", [dev.name for dev in spines]) - print("Leaves:", [dev.name for dev in leaves]) - print("Access Switches:", [dev.name for dev in access_switches]) - print("Each leaf/spine link got a new /31, Loopback0 got a new /32, and ASNs were assigned.") - - -if __name__ == "__main__": - main() diff --git a/utilities/Create_Fabric/exceptions.py b/utilities/Create_Fabric/exceptions.py new file mode 100644 index 0000000..9d63bb9 --- /dev/null +++ b/utilities/Create_Fabric/exceptions.py @@ -0,0 +1,17 @@ +"""Exceptions personnalisées pour la création de fabric VXLAN""" + +class FabricError(Exception): + """Erreur de base pour la fabric""" + pass + +class DeviceCreationError(FabricError): + """Erreur lors de la création d'un équipement""" + pass + +class IPAllocationError(FabricError): + """Erreur lors de l'allocation d'adresses IP""" + pass + +class CablingError(FabricError): + """Erreur lors du câblage""" + pass \ No newline at end of file diff --git a/utilities/Create_Fabric/fabric_creator.py b/utilities/Create_Fabric/fabric_creator.py new file mode 100644 index 0000000..78d4c0c --- /dev/null +++ b/utilities/Create_Fabric/fabric_creator.py @@ -0,0 +1,414 @@ +""" +Classe principale pour la création de fabric VXLAN dans NetBox +""" + +import logging +from typing import Dict, List, Tuple, Optional + +from config import FabricConfig, DeviceRoles, IPRoles, InterfaceTypes +from exceptions import DeviceCreationError, IPAllocationError, CablingError, FabricError +from helpers.netbox_backend import NetBoxBackend + +logger = logging.getLogger(__name__) + +class VXLANFabricCreator: + """Gère la création d'une fabric VXLAN complète""" + + def __init__(self, netbox: NetBoxBackend): + """ + Initialise le créateur de fabric + + Args: + netbox: Instance de NetBoxBackend + """ + self.nb = netbox + self.config: Optional[FabricConfig] = None + self._roles: Dict = {} + self._ip_roles: Dict = {} + + def validate_prerequisites(self) -> bool: + """ + Valide tous les prérequis nécessaires + + Returns: + bool: True si tous les prérequis sont validés + + Raises: + DeviceCreationError: Si un rôle d'équipement est manquant + IPAllocationError: Si un rôle IP est manquant + """ + try: + # Validation des rôles d'équipements + for role in DeviceRoles: + role_obj = self.nb.get_device_role(role.value) + if not role_obj: + raise DeviceCreationError(f"Role {role.value} not found") + self._roles[role.value] = role_obj + + # Validation des rôles IP + for role in IPRoles: + ip_role = self.nb.nb.ipam.roles.get(slug=role.value) + if not ip_role: + raise IPAllocationError(f"IP role {role.value} not found") + self._ip_roles[role.value] = ip_role + + # Validation des types d'équipements + device_types = [ + self.config.spine_type, + self.config.leaf_type, + self.config.access_type + ] + for dev_type in device_types: + if not self.nb.get_device_type_by_slug(dev_type): + raise DeviceCreationError(f"Device type {dev_type} not found") + + return True + + except Exception as e: + logger.error(f"Validation failed: {str(e)}") + return False + + def get_or_create_site(self) -> dict: + """ + Sélectionne un site existant ou en crée un nouveau + + Returns: + dict: Objet site + + Raises: + DeviceCreationError: Si la création/sélection échoue + """ + existing_sites = self.nb.get_sites() + if not existing_sites: + raise DeviceCreationError("No sites found in NetBox") + + print("\nExisting Sites:") + for idx, site in enumerate(existing_sites, start=1): + print(f" {idx}. {site.name} (slug={site.slug})") + + choice = input("Choose site number or 'new': ").strip().lower() + + if choice == "new": + name = input("Site name: ").strip() + code = input("Site code: ").strip().upper() + if not all([name, code]): + raise DeviceCreationError("Site name and code required") + return self.nb.create_site(name, code.lower()) + + try: + return existing_sites[int(choice) - 1] + except (ValueError, IndexError): + raise DeviceCreationError("Invalid site selection") + + def create_spines(self, site) -> List[dict]: + """ + Crée les switches spine + + Args: + site: Objet site + + Returns: + List[dict]: Liste des spines créés + + Raises: + DeviceCreationError: Si la création échoue + """ + spines = [] + for i in range(1, 3): # Toujours 2 spines + name = f"{self.config.site_code.lower()}dc_sp{i}_00" + spine = self.nb.create_device( + name=name, + device_type_slug=self.config.spine_type, + role_id=self._roles[DeviceRoles.SPINE.value].id, + site_id=site.id + ) + if not spine: + raise DeviceCreationError(f"Failed to create spine {name}") + spines.append(spine) + logger.info(f"Created spine: {spine.name}") + return spines + + def create_building_pair(self, site, building_num) -> Tuple[dict, dict]: + """ + Crée une paire leaf/access pour un bâtiment + + Args: + site: Objet site + building_num: Numéro du bâtiment + + Returns: + Tuple[dict, dict]: Paire (leaf, access) + + Raises: + DeviceCreationError: Si la création échoue + """ + try: + # Création du location + location_name = f"{self.config.site_code}{building_num}" + location = self.nb.nb.dcim.locations.create( + name=location_name, + slug=location_name.lower(), + site=site.id + ) + + # Création Leaf + leaf_name = f"{self.config.site_code.lower()}{str(building_num).zfill(2)}_lf1_00" + leaf = self.nb.create_device( + name=leaf_name, + device_type_slug=self.config.leaf_type, + role_id=self._roles[DeviceRoles.LEAF.value].id, + site_id=site.id, + location_id=location.id + ) + if not leaf: + raise DeviceCreationError(f"Failed to create leaf {leaf_name}") + + # Création Access + access_name = f"{self.config.site_code.lower()}{str(building_num).zfill(2)}_sw1_00" + access = self.nb.create_device( + name=access_name, + device_type_slug=self.config.access_type, + role_id=self._roles[DeviceRoles.ACCESS.value].id, + site_id=site.id, + location_id=location.id + ) + if not access: + raise DeviceCreationError(f"Failed to create access switch {access_name}") + + logger.info(f"Created leaf/access pair: {leaf_name} / {access_name}") + return leaf, access + + except Exception as e: + raise DeviceCreationError(f"Failed to create building pair: {str(e)}") + + def setup_cabling(self, leaf, spines, access) -> Tuple[dict, dict, dict, dict]: + """ + Configure le câblage pour un ensemble leaf/spines/access + + Topologie: + - Spine1.EthX -> LeafY.Eth1 (où X = numéro du leaf) + - Spine2.EthX -> LeafY.Eth2 (où X = numéro du leaf) + - LeafY.Eth3 -> AccessY.Eth1 + + Args: + leaf: Objet leaf + spines: Liste des spines + access: Objet access switch + + Returns: + Tuple[dict, dict, dict, dict]: Interfaces connectées (leaf_if1, leaf_if2, spine1_if, spine2_if) + + Raises: + CablingError: Si le câblage échoue + """ + try: + # Extraire le numéro du leaf (ex: pa01_lf1_00 -> 1) + leaf_number = int(leaf.name.split('_')[0][-2:]) + + # Leaf -> Spine1 (Leaf.Eth1 -> Spine1.EthX) + leaf_if1 = self.nb.get_or_create_interface( + leaf.id, "Ethernet1", InterfaceTypes.QSFPP.value + ) + spine1_if = self.nb.get_or_create_interface( + spines[0].id, f"Ethernet{leaf_number}", InterfaceTypes.QSFPP.value + ) + if not self.nb.create_cable_if_not_exists(leaf_if1, spine1_if): + raise CablingError(f"Failed to cable {leaf_if1.name} to {spine1_if.name}") + logger.info(f"Connected {leaf.name}.Eth1 to {spines[0].name}.Eth{leaf_number}") + + # Leaf -> Spine2 (Leaf.Eth2 -> Spine2.EthX) + leaf_if2 = self.nb.get_or_create_interface( + leaf.id, "Ethernet2", InterfaceTypes.QSFPP.value + ) + spine2_if = self.nb.get_or_create_interface( + spines[1].id, f"Ethernet{leaf_number}", InterfaceTypes.QSFPP.value + ) + if not self.nb.create_cable_if_not_exists(leaf_if2, spine2_if): + raise CablingError(f"Failed to cable {leaf_if2.name} to {spine2_if.name}") + logger.info(f"Connected {leaf.name}.Eth2 to {spines[1].name}.Eth{leaf_number}") + + # Leaf -> Access (Leaf.Eth3 -> Access.Eth1) + leaf_if3 = self.nb.get_or_create_interface( + leaf.id, "Ethernet3", InterfaceTypes.QSFPP.value + ) + access_if = self.nb.get_or_create_interface( + access.id, "Ethernet1", InterfaceTypes.QSFPP.value + ) + if not self.nb.create_cable_if_not_exists(leaf_if3, access_if): + raise CablingError(f"Failed to cable {leaf_if3.name} to {access_if.name}") + logger.info(f"Connected {leaf.name}.Eth3 to {access.name}.Eth1") + + return leaf_if1, leaf_if2, spine1_if, spine2_if + + except ValueError as e: + raise CablingError(f"Invalid device naming format: {str(e)}") + except Exception as e: + raise CablingError(f"Cabling failed: {str(e)}") + + def setup_ip_addressing(self, site, interfaces, devices) -> None: + """ + Configure l'adressage IP pour les interfaces et les loopbacks + + Args: + site: Objet site + interfaces: Liste de tuples d'interfaces à connecter + devices: Liste des équipements nécessitant un loopback + + Raises: + IPAllocationError: Si l'allocation IP échoue + """ + try: + # Récupération des prefixes parents + underlay_pfxs = self.nb.nb.ipam.prefixes.filter( + role_id=self._ip_roles[IPRoles.UNDERLAY.value].id, + scope_id=site.id + ) + loopback_pfxs = self.nb.nb.ipam.prefixes.filter( + role_id=self._ip_roles[IPRoles.LOOPBACK.value].id, + scope_id=site.id + ) + + if not underlay_pfxs or not loopback_pfxs: + raise IPAllocationError("Missing required prefix pools") + + parent_prefix = list(underlay_pfxs)[0] + loopback_prefix = list(loopback_pfxs)[0] + + # Attribution des /31 pour les liens + for if_a, if_b in interfaces: + child_prefix = self.nb.allocate_prefix( + parent_prefix, 31, site.id, + self._ip_roles[IPRoles.UNDERLAY.value].id, + self.config.tenant_id + ) + if not child_prefix: + raise IPAllocationError(f"Failed to allocate /31 for {if_a.name}-{if_b.name}") + + ips = self.nb.get_available_ips_in_prefix(child_prefix) + if len(ips) < 2: + raise IPAllocationError(f"Not enough IPs in /31 for {if_a.name}-{if_b.name}") + + self.nb.assign_ip_to_interface(if_a, ips[0].address) + self.nb.assign_ip_to_interface(if_b, ips[1].address) + logger.info(f"Assigned IPs to {if_a.name}-{if_b.name}") + + # Attribution des loopbacks + for device in devices: + # Création de l'interface loopback + loopback = self.nb.get_or_create_interface( + device.id, + "Loopback0", + InterfaceTypes.VIRTUAL.value + ) + if not loopback: + raise IPAllocationError(f"Failed to create Loopback0 for {device.name}") + + # Allocation du /32 + loopback_32 = self.nb.allocate_prefix( + loopback_prefix, 32, site.id, + self._ip_roles[IPRoles.LOOPBACK.value].id, + self.config.tenant_id + ) + if not loopback_32: + raise IPAllocationError(f"Failed to allocate /32 for {device.name}") + + ips = self.nb.get_available_ips_in_prefix(loopback_32) + if not ips: + raise IPAllocationError(f"No IPs available in /32 for {device.name}") + + ip = self.nb.assign_ip_to_interface(loopback, ips[0].address) + if ip: + logger.info(f"Assigned {ip.address} to {device.name} Loopback0") + else: + raise IPAllocationError(f"Failed to assign IP to {device.name} Loopback0") + + except Exception as e: + raise IPAllocationError(f"IP addressing failed: {str(e)}") + + def assign_asns(self, spines: List[dict], leaves: List[dict]) -> None: + """ + Attribue les ASN aux équipements + + Args: + spines: Liste des spines + leaves: Liste des leaves + + Raises: + DeviceCreationError: Si l'attribution des ASN échoue + """ + try: + # ASNs pour les spines + for i, spine in enumerate(spines): + asn = self.config.base_spine_asn + i + if not self.nb.save_custom_fields(spine, {"ASN": asn}): + raise DeviceCreationError(f"Failed to assign ASN {asn} to {spine.name}") + logger.info(f"Assigned ASN {asn} to {spine.name}") + + # ASNs pour les leaves + for i, leaf in enumerate(leaves): + asn = self.config.base_leaf_asn + i + if not self.nb.save_custom_fields(leaf, {"ASN": asn}): + raise DeviceCreationError(f"Failed to assign ASN {asn} to {leaf.name}") + logger.info(f"Assigned ASN {asn} to {leaf.name}") + + except Exception as e: + raise DeviceCreationError(f"ASN assignment failed: {str(e)}") + + def create_fabric(self) -> Dict: + """ + Processus principal de création de la fabric + + Returns: + Dict: Résultat de la création + + Raises: + FabricError: Si la création échoue + """ + try: + if not self.validate_prerequisites(): + raise FabricError("Prerequisites validation failed") + + # 1. Création/sélection du site + site = self.get_or_create_site() + self.config.site_code = site.name[:2].upper() + + # 2. Création des équipements + spines = self.create_spines(site) + leaves = [] + access_switches = [] + interface_pairs = [] + + # 3. Création des paires leaf/access par bâtiment + for building in range(1, self.config.num_buildings + 1): + leaf, access = self.create_building_pair(site, building) + leaves.append(leaf) + access_switches.append(access) + + # 4. Câblage + interfaces = self.setup_cabling(leaf, spines, access) + interface_pairs.extend([ + (interfaces[0], interfaces[2]), # Leaf-Spine1 + (interfaces[1], interfaces[3]) # Leaf-Spine2 + ]) + + # 5. Configuration IP + self.setup_ip_addressing( + site=site, + interfaces=interface_pairs, + devices=spines + leaves # Loopbacks uniquement pour spines et leaves + ) + + # 6. Attribution des ASN + self.assign_asns(spines, leaves) + + logger.info("Fabric creation completed successfully") + return { + 'site': site, + 'spines': spines, + 'leaves': leaves, + 'access_switches': access_switches + } + + except Exception as e: + logger.error(f"Fabric creation failed: {str(e)}") + raise FabricError(f"Fabric creation failed: {str(e)}") \ No newline at end of file diff --git a/utilities/Create_Fabric/helpers/netbox_backend.py b/utilities/Create_Fabric/helpers/netbox_backend.py index 4426b7a..21fc7b9 100644 --- a/utilities/Create_Fabric/helpers/netbox_backend.py +++ b/utilities/Create_Fabric/helpers/netbox_backend.py @@ -4,223 +4,353 @@ NetBox_backend.py A Python class to interact with NetBox using pynetbox. """ +import logging import pynetbox -from typing import Optional, List, Dict +from datetime import datetime, timedelta +from enum import Enum +from typing import Optional, List, Dict, Any +from functools import wraps +# Configuration du logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +class InterfaceTypes(Enum): + QSFPP = "40gbase-x-qsfpp" + SFP = "1000base-x-sfp" + SFP_PLUS = "10gbase-x-sfpp" + QSFP28 = "100gbase-x-qsfp28" + +class DeviceStatus(Enum): + ACTIVE = "active" + PLANNED = "planned" + OFFLINE = "offline" + FAILED = "failed" + +def error_handler(func): + """Décorateur pour la gestion uniforme des erreurs""" + @wraps(func) + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except Exception as e: + logger.error(f"{func.__name__} failed: {str(e)}") + return None + return wrapper class NetBoxBackend: def __init__(self, url: str, token: str, verify_ssl: bool = True): """ Initializes the NetBox API connection. + + Args: + url (str): NetBox instance URL + token (str): API token for authentication + verify_ssl (bool): Whether to verify SSL certificates """ self.url = url self.token = token self.nb = pynetbox.api(self.url, token=self.token) self.nb.http_session.verify = verify_ssl + def validate_input(self, **kwargs) -> bool: + """ + Validates input parameters. + + Args: + **kwargs: Key-value pairs to validate + + Returns: + bool: True if all inputs are valid + + Raises: + ValueError: If any input is invalid + """ + for key, value in kwargs.items(): + if value is None or (isinstance(value, str) and not value.strip()): + raise ValueError(f"Invalid {key}: {value}") + return True + + def check_connection(self) -> bool: + """Verifies connection to NetBox instance""" + try: + self.nb.status() + return True + except Exception as e: + logger.error(f"Connection check failed: {e}") + return False + ## ---------------------------------- ## TENANTS MANAGEMENT ## ---------------------------------- + @error_handler def get_tenants(self) -> List: - """ Returns all tenants in NetBox. """ - try: - return list(self.nb.tenancy.tenants.all()) - except Exception as e: - print(f"[ERROR] Failed to fetch tenants: {e}") - return [] + """Returns all tenants in NetBox.""" + return list(self.nb.tenancy.tenants.all()) + @error_handler def create_tenant(self, name: str, slug: str): - """ Creates a new tenant in NetBox. """ - try: - return self.nb.tenancy.tenants.create({"name": name, "slug": slug}) - except Exception as e: - print(f"[ERROR] Failed to create tenant '{name}': {e}") - return None + """ + Creates a new tenant in NetBox. + + Args: + name (str): Tenant name + slug (str): URL-friendly slug + """ + self.validate_input(name=name, slug=slug) + return self.nb.tenancy.tenants.create({"name": name, "slug": slug}) ## ---------------------------------- ## SITES MANAGEMENT ## ---------------------------------- + @error_handler def get_sites(self) -> List: - """ Returns all sites in NetBox. """ - try: - return list(self.nb.dcim.sites.all()) - except Exception as e: - print(f"[ERROR] Failed to fetch sites: {e}") - return [] + """Returns all sites in NetBox.""" + return list(self.nb.dcim.sites.all()) - def create_site(self, name: str, slug: str): - """ Creates a new site in NetBox. """ - try: - return self.nb.dcim.sites.create({"name": name, "slug": slug}) - except Exception as e: - print(f"[ERROR] Failed to create site '{name}': {e}") - return None + @error_handler + def create_site(self, name: str, slug: str, status: str = "active"): + """ + Creates a new site in NetBox. + + Args: + name (str): Site name + slug (str): URL-friendly slug + status (str): Site status + """ + self.validate_input(name=name, slug=slug) + return self.nb.dcim.sites.create({ + "name": name, + "slug": slug, + "status": status + }) ## ---------------------------------- ## DEVICE MANAGEMENT ## ---------------------------------- + @error_handler def get_device_type_by_slug(self, slug: str) -> Optional[Dict]: - """ Returns a device type by slug. """ - try: - return self.nb.dcim.device_types.get(slug=slug) - except Exception as e: - print(f"[ERROR] Failed to fetch device type '{slug}': {e}") - return None + """Returns a device type by slug.""" + self.validate_input(slug=slug) + return self.nb.dcim.device_types.get(slug=slug) + @error_handler def get_device_role(self, slug: str) -> Optional[Dict]: - """ Returns a device role by slug. """ - try: - return self.nb.dcim.device_roles.get(slug=slug) - except Exception as e: - print(f"[ERROR] Failed to fetch device role '{slug}': {e}") + """Returns a device role by slug.""" + self.validate_input(slug=slug) + return self.nb.dcim.device_roles.get(slug=slug) + + def device_exists(self, name: str) -> bool: + """Checks if a device exists by name.""" + return bool(self.nb.dcim.devices.get(name=name)) + + @error_handler + def get_device_by_name(self, name: str) -> Optional[Dict]: + """Returns a device by name.""" + self.validate_input(name=name) + return self.nb.dcim.devices.get(name=name) + + @error_handler + def create_device(self, name: str, device_type_slug: str, role_id: int, + site_id: int, location_id: Optional[int] = None, + status: str = DeviceStatus.ACTIVE.value): + """ + Creates a device in NetBox if it doesn't already exist. + + Args: + name (str): Device name + device_type_slug (str): Device type slug + role_id (int): Role ID + site_id (int): Site ID + location_id (Optional[int]): Location ID + status (str): Device status + + Returns: + Device object if successful, None otherwise + """ + self.validate_input(name=name, device_type_slug=device_type_slug) + + if self.device_exists(name): + return self.get_device_by_name(name) + + device_type = self.get_device_type_by_slug(device_type_slug) + if not device_type: + logger.error(f"Device type '{device_type_slug}' not found") return None - - def create_device(self, name: str, device_type_slug: str, role_id: int, site_id: int, location_id: Optional[int] = None): - """ Creates a device in NetBox if it doesn't already exist. """ + try: - existing_device = self.nb.dcim.devices.get(name=name) - if existing_device: - return existing_device - - device_type = self.get_device_type_by_slug(device_type_slug) - if not device_type: - print(f"[ERROR] Device type '{device_type_slug}' not found.") - return None - - return self.nb.dcim.devices.create({ + device_data = { "name": name, - "device_type": {"id": device_type.id}, + "device_type": device_type.id, "role": role_id, "site": site_id, - "location": location_id - }) + "location": location_id, + "status": status + } + + # Debug + logger.debug("Creating device with data:") + for key, value in device_data.items(): + logger.debug(f"{key}: {value} (type: {type(value)})") + + return self.nb.dcim.devices.create(device_data) + + except (ValueError, AttributeError) as e: + logger.error(f"Error preparing device data: {str(e)}") + return None except Exception as e: - print(f"[ERROR] Failed to create device '{name}': {e}") + logger.error(f"Error creating device: {str(e)}") return None ## ---------------------------------- ## INTERFACES & CABLING ## ---------------------------------- - def get_or_create_interface(self, device_id: int, if_name: str, if_type: str = "40gbase-x-qsfpp"): - """ Retrieves or creates an interface on a given device. """ - try: - intf = self.nb.dcim.interfaces.get(device_id=device_id, name=if_name) - if intf: - return intf - return self.nb.dcim.interfaces.create({ - "device": device_id, - "name": if_name, - "type": if_type, - }) - except Exception as e: - print(f"[ERROR] Failed to create/get interface '{if_name}': {e}") + @error_handler + def get_or_create_interface(self, device_id: int, if_name: str, + if_type: str = InterfaceTypes.QSFPP.value): + """ + Retrieves or creates an interface on a given device. + + Args: + device_id (int): Device ID + if_name (str): Interface name + if_type (str): Interface type + """ + self.validate_input(if_name=if_name) + + intf = self.nb.dcim.interfaces.get(device_id=device_id, name=if_name) + if intf: + return intf + + return self.nb.dcim.interfaces.create({ + "device": device_id, + "name": if_name, + "type": if_type, + }) + + @error_handler + def create_cable_if_not_exists(self, intf_a, intf_b): + """ + Creates a cable between two interfaces if it doesn't exist. + + Args: + intf_a: First interface object + intf_b: Second interface object + """ + if not all([intf_a, intf_b]): + logger.warning("Missing interfaces to create cable") return None - def create_cable_if_not_exists(self, intf_a, intf_b): - """ Creates a cable between two interfaces if it doesn't exist. """ - if not intf_a or not intf_b: - print("[WARN] Missing interfaces to create cable.") - return None - try: - return self.nb.dcim.cables.create({ - "a_terminations": [{"object_type": "dcim.interface", "object_id": intf_a.id}], - "b_terminations": [{"object_type": "dcim.interface", "object_id": intf_b.id}], - "status": "connected", - }) - except Exception as e: - print(f"[ERROR] Failed to create cable: {e}") - return None + return self.nb.dcim.cables.create({ + "a_terminations": [{"object_type": "dcim.interface", "object_id": intf_a.id}], + "b_terminations": [{"object_type": "dcim.interface", "object_id": intf_b.id}], + "status": "connected", + }) ## ---------------------------------- ## NETWORK MANAGEMENT ## ---------------------------------- - def create_vlan(self, vlan_id: int, vlan_name: str, slug:str, tenant_id: str): - """ Creates a VLAN in NetBox. """ - try: - return self.nb.ipam.vlans.create({ - "vid": vlan_id, - "name": vlan_name, - "slug": slug, - "tenant": tenant_id, - }) - except Exception as e: - print(f"[ERROR] Failed to create VLAN '{vlan_name}': {e}") - return None + @error_handler + def create_vlan(self, vlan_id: int, vlan_name: str, slug: str, tenant_id: str): + """Creates a VLAN in NetBox.""" + self.validate_input(vlan_name=vlan_name, slug=slug) + return self.nb.ipam.vlans.create({ + "vid": vlan_id, + "name": vlan_name, + "slug": slug, + "tenant": tenant_id, + }) + @error_handler def create_l2vpn(self, vni_id: int, vpn_name: str, slug: str, tenant_id: str): - """ Creates an L2VPN in NetBox. """ - try: - return self.nb.vpn.l2vpns.create({ - "name": vpn_name, - "slug": slug, - "type": "vxlan-evpn", - "tenant": tenant_id, - "identifier": vni_id - }) - except Exception as e: - print(f"[ERROR] Failed to create L2VPN '{vpn_name}': {e}") - return None + """Creates an L2VPN in NetBox.""" + self.validate_input(vpn_name=vpn_name, slug=slug) + return self.nb.vpn.l2vpns.create({ + "name": vpn_name, + "slug": slug, + "type": "vxlan-evpn", + "tenant": tenant_id, + "identifier": vni_id + }) - def create_vxlan_termination(self, l2vpn_id: int, assigned_object_type: str, assigned_object_id: int): - """ Creates a VXLAN termination for L2VPN. """ - try: - return self.nb.vpn.l2vpn_terminations.create({ - "l2vpn": l2vpn_id, - "assigned_object_type": assigned_object_type, - "assigned_object_id": assigned_object_id - }) - except Exception as e: - print(f"[ERROR] Failed to create VXLAN termination: {e}") - return None + @error_handler + def create_vxlan_termination(self, l2vpn_id: int, assigned_object_type: str, + assigned_object_id: int): + """Creates a VXLAN termination for L2VPN.""" + return self.nb.vpn.l2vpn_terminations.create({ + "l2vpn": l2vpn_id, + "assigned_object_type": assigned_object_type, + "assigned_object_id": assigned_object_id + }) + + @error_handler + def allocate_prefix(self, parent_prefix, prefix_length: int, + site_id: int, role_id: int, tenant_id: int): + """ + Allocates a child subnet from a parent prefix. - def allocate_prefix(self, parent_prefix, prefix_length: int, site_id: int, role_id: int): + Args: + parent_prefix: Parent prefix object + prefix_length (int): Length of the child prefix + site_id (int): Site ID + role_id (int): Role ID + tenant_id (int): Tenant ID """ - Alloue un sous-réseau enfant (ex: /31 ou /32) à partir d'un préfixe parent - via available_prefixes.create(). - """ - try: - child_prefix = parent_prefix.available_prefixes.create({ - "prefix_length": prefix_length, - "site": site_id, - "role": role_id, - }) - return child_prefix - except Exception as exc: - print(f"[ERROR] Echec de l'allocation d'un /{prefix_length} pour {parent_prefix.prefix}: {exc}") - return None + return parent_prefix.available_prefixes.create({ + "prefix_length": prefix_length, + "site": site_id, + "role": role_id, + "tenant": tenant_id + }) - def assign_ip_to_interface(self, interface, ip_address: str, status: str = "active"): - """ Assigns an IP address to an interface. """ - try: - return self.nb.ipam.ip_addresses.create({ - "address": ip_address, - "assigned_object_id": interface.id, - "assigned_object_type": "dcim.interface", - "status": status, - }) - except Exception as e: - print(f"[ERROR] Failed to assign IP {ip_address}: {e}") - return None + @error_handler + def assign_ip_to_interface(self, interface, ip_address: str, + status: str = DeviceStatus.ACTIVE.value): + """Assigns an IP address to an interface.""" + self.validate_input(ip_address=ip_address) + return self.nb.ipam.ip_addresses.create({ + "address": ip_address, + "assigned_object_id": interface.id, + "assigned_object_type": "dcim.interface", + "status": status, + }) + @error_handler def get_available_ips_in_prefix(self, prefix) -> List: - """ Fetches available IPs within a prefix. """ + """Fetches available IPs within a prefix.""" if not hasattr(prefix, "available_ips"): - print(f"[ERROR] Invalid prefix object: {prefix}") + logger.error(f"Invalid prefix object: {prefix}") return [] return list(prefix.available_ips.list()) - def save_custom_fields(self, device, fields: Dict[str, any]): - """ Saves custom fields for a device. """ - try: - for key, value in fields.items(): - device.custom_fields[key] = value - device.save() - return True - except Exception as e: - print(f"[ERROR] Failed to save custom fields: {e}") - return False + @error_handler + def save_custom_fields(self, device, fields: Dict[str, Any]) -> bool: + """ + Saves custom fields for a device. + + Args: + device: Device object + fields (Dict[str, Any]): Custom fields to save + """ + for key, value in fields.items(): + device.custom_fields[key] = value + device.save() + return True + + @error_handler + def cleanup_unused_devices(self, older_than_days: int = 30): + """ + Removes devices that haven't been updated in specified days. + + Args: + older_than_days (int): Number of days of inactivity + """ + cutoff = datetime.now() - timedelta(days=older_than_days) + devices = self.nb.dcim.devices.filter(last_updated__lt=cutoff) + for device in devices: + device.delete() \ No newline at end of file diff --git a/utilities/Create_Fabric/main.py b/utilities/Create_Fabric/main.py new file mode 100644 index 0000000..f47c55e --- /dev/null +++ b/utilities/Create_Fabric/main.py @@ -0,0 +1,117 @@ +""" +Script principal pour la création de fabric VXLAN dans NetBox +""" + +import sys +import getpass +import logging +from typing import Dict + +from config import FabricConfig +from exceptions import FabricError +from fabric_creator import VXLANFabricCreator +from helpers.netbox_backend import NetBoxBackend + +# Configuration du logging +logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +def get_user_input() -> Dict: + """ + Récupère les entrées utilisateur pour la configuration de la fabric + + Returns: + Dict: Dictionnaire contenant les paramètres de configuration + + Raises: + FabricError: Si les entrées sont invalides + """ + try: + return { + 'netbox_url': input("NetBox URL: ").strip(), + 'netbox_token': getpass.getpass("NetBox API Token: "), + 'num_buildings': int(input("Number of buildings (1-5): ").strip()), + 'spine_type_slug': input("Spine device type slug: ").strip(), + 'leaf_type_slug': input("Leaf device type slug: ").strip(), + 'access_type_slug': input("Access switch device type slug: ").strip(), + } + except ValueError as error: + raise FabricError(f"Invalid input: {str(error)}") + +def validate_input(fabric_config: Dict) -> None: + """ + Valide les entrées utilisateur pour la configuration de la fabric + + Args: + fabric_config: Dictionnaire contenant les paramètres de configuration + + Raises: + FabricError: Si la validation échoue + """ + # Validation des paramètres de connexion NetBox + if not all([fabric_config['netbox_url'], fabric_config['netbox_token']]): + raise FabricError("NetBox URL and token are required") + + # Validation du nombre de bâtiments + if not 1 <= fabric_config['num_buildings'] <= 5: + raise FabricError("Number of buildings must be between 1 and 5") + + # Validation des types d'équipements + required_device_types = [ + fabric_config['spine_type_slug'], + fabric_config['leaf_type_slug'], + fabric_config['access_type_slug'] + ] + if not all(required_device_types): + raise FabricError("All device type slugs are required") + +def main(): + """Point d'entrée principal du script""" + try: + # 1. Récupération des entrées utilisateur + fabric_config = get_user_input() + validate_input(fabric_config) + + # 2. Initialisation de la connexion NetBox + netbox_backend = NetBoxBackend( + fabric_config['netbox_url'], + fabric_config['netbox_token'] + ) + + if not netbox_backend.check_connection(): + raise FabricError("Failed to connect to NetBox") + + # 3. Configuration de la fabric + fabric_settings = FabricConfig( + site_code="", # Sera défini plus tard + num_buildings=fabric_config['num_buildings'], + spine_type=fabric_config['spine_type_slug'], + leaf_type=fabric_config['leaf_type_slug'], + access_type=fabric_config['access_type_slug'] + ) + + # 4. Création de la fabric + fabric_creator = VXLANFabricCreator(netbox_backend) + fabric_creator.config = fabric_settings + fabric_result = fabric_creator.create_fabric() + + # 5. Affichage du résultat + logger.info("=== Fabric Creation Completed ===") + logger.info(f"Site: {fabric_result['site'].name}") + logger.info("Spines: %s", [spine.name for spine in fabric_result['spines']]) + logger.info("Leaves: %s", [leaf.name for leaf in fabric_result['leaves']]) + logger.info("Access Switches: %s", + [access.name for access in fabric_result['access_switches']]) + + except FabricError as fabric_error: + logger.error("Fabric creation error: %s", str(fabric_error)) + sys.exit(1) + except Exception as unexpected_error: + logger.error("Unexpected error: %s", str(unexpected_error)) + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/utilities/VPN/Customers.yml b/utilities/VPN/Customers.yml index 958f552..843795d 100644 --- a/utilities/VPN/Customers.yml +++ b/utilities/VPN/Customers.yml @@ -3,7 +3,7 @@ Tenant: Subnets: 10.100.0.0/24 VLAN: 100 VNI: 1100 - - Name: Purpel + - Name: Purple Subnets: 10.50.0.0/24 VLAN: 50 VNI: 1050 \ No newline at end of file diff --git a/utilities/import.py b/utilities/import.py index 2b1308d..2235fe3 100644 --- a/utilities/import.py +++ b/utilities/import.py @@ -53,6 +53,32 @@ def get_or_create_device_role(netbox_url, headers, role): print(f"[INFO] Device Role '{name}' created (ID={created['id']}).") return created +def get_or_create_interface_template(netbox_url, headers, device_type_id, interface_data): + """ + Create interface template for a device type if it doesn't exist + """ + name = interface_data["name"] + url = f"{netbox_url}/api/dcim/interface-templates/?name={name}&devicetype_id={device_type_id}" + resp = requests.get(url, headers=headers) + resp.raise_for_status() + results = resp.json()["results"] + + if results: + print(f"[INFO] Interface template '{name}' already exists for device type ID={device_type_id}") + return results[0] + + create_url = f"{netbox_url}/api/dcim/interface-templates/" + payload = { + "device_type": device_type_id, + "name": name, + "type": interface_data["type"], + "mgmt_only": interface_data.get("mgmt_only", False) + } + resp = requests.post(create_url, headers=headers, json=payload) + resp.raise_for_status() + created = resp.json() + print(f"[INFO] Interface template '{name}' created for device type ID={device_type_id}") + return created def get_or_create_device_type(netbox_url, headers, device_type, manufacturers_cache): manufacturer_slug = device_type["manufacturer"] @@ -91,7 +117,12 @@ def get_or_create_device_type(netbox_url, headers, device_type, manufacturers_ca print( f"[INFO] Device Type '{created_dt['model']}' created (ID={created_dt['id']})." ) + # Create interface templates if defined + if "interfaces" in device_type: + for interface in device_type["interfaces"]: + get_or_create_interface_template(netbox_url, headers, created_dt["id"], interface) + return created_dt ######################################## # Subnets import @@ -211,8 +242,19 @@ def create_container_prefix(netbox_url, headers, cidr, description, role_id, sit # Divers Creation ######################################## -def get_or_create_custom_field(netbox_url, headers): - field_name = "ASN" +def get_or_create_custom_field(netbox_url, headers, field_name, object_types, field_type, label=None, description=None, related_object_type=None): + """ + Create a custom field in NetBox. + Args: + netbox_url (str): The NetBox URL + headers (dict): Request headers + field_name (str): Name of the custom field + object_types (list): List of object types (e.g. ['dcim.device']) + field_type (str): Type of field (e.g. 'integer', 'text', 'boolean', 'object') + label (str, optional): Label for the field. Defaults to field_name + description (str, optional): Description of the field. Defaults to field_name + related_object_type (str, optional): Required for object type fields (e.g. 'ipam.vlan') + """ url = f"{netbox_url}/api/extras/custom-fields/" # Check if the custom field already exists @@ -223,21 +265,32 @@ def get_or_create_custom_field(netbox_url, headers): print(f"[INFO] Custom field '{field_name}' already exists.") return + # Set defaults if not provided + label = label or field_name + description = description or field_name + # Define the custom field payload custom_field_data = { "name": field_name, - "label": "ASN", - "type": "integer", - "description": "ASN", + "label": label, + "type": field_type, + "description": description, "required": False, "default": "", "weight": 100, "filter_logic": "loose", "ui_visible": "always", "is_cloneable": True, - "object_types": ["dcim.device"], + "object_types": object_types, + "related_object_type": related_object_type } + # Add related_object_type if field_type is 'object' + if field_type == "object" and related_object_type: + custom_field_data["object_type"] = related_object_type + elif field_type == "object" and not related_object_type: + raise ValueError("related_object_type is required for object type fields") + # Create the custom field create_response = requests.post(url, headers=headers, json=custom_field_data) if create_response.status_code == 201: @@ -277,7 +330,8 @@ def main(): subnets_data = yaml.safe_load(f) # Divers Creation - get_or_create_custom_field(netbox_url, headers) + get_or_create_custom_field(netbox_url, headers, "ASN", ["dcim.device"], "integer", "ASN", "Autonomous System Number") + #get_or_create_custom_field(netbox_url, headers, "Customer", ["dcim.interface"], "object", "Customer", "Customer Name", "tenancy.tenant") ###################################################### # device_model.yml : manufacturers, roles, types @@ -297,7 +351,9 @@ def main(): if "device_types" in device_model_data: for dt in device_model_data["device_types"]: - get_or_create_device_type(netbox_url, headers, dt, manufacturers_cache) + device_type = get_or_create_device_type(netbox_url, headers, dt, manufacturers_cache) + if not device_type: + print(f"[ERROR] Failed to create device type for {dt.get('model', 'unknown')}") ###################################################### # subnets.yml : Region, Site, Containers, etc.