Files
projet-vxlan-automation/utilities/Create_Fabric/helpers/netbox_backend.py
D. Arnodo c8daee6c11 Feat/leaf template (#7)
* fix(template) : Change `Ethernet3`configuration
* fix(Template): Missing Underlay configuration
* Fix(template) : error 'ipam.models.ip.IPAddress DoesNotExist
* fix(routing) : ! IP routing not enabled
* fix(leaves template): adding ip routing and multi-agent model
2025-03-28 16:52:37 +01:00

356 lines
11 KiB
Python

"""
NetBox_backend.py
=================
A Python class to interact with NetBox using pynetbox.
"""
import logging
import pynetbox
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional, List, Dict, Any
from functools import wraps
# Configuration du logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class InterfaceTypes(Enum):
QSFPP = "40gbase-x-qsfpp"
SFP = "1000base-x-sfp"
SFP_PLUS = "10gbase-x-sfpp"
QSFP28 = "100gbase-x-qsfp28"
class DeviceStatus(Enum):
ACTIVE = "active"
PLANNED = "planned"
OFFLINE = "offline"
FAILED = "failed"
def error_handler(func):
"""Décorateur pour la gestion uniforme des erreurs"""
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logger.error(f"{func.__name__} failed: {str(e)}")
return None
return wrapper
class NetBoxBackend:
def __init__(self, url: str, token: str, verify_ssl: bool = True):
"""
Initializes the NetBox API connection.
Args:
url (str): NetBox instance URL
token (str): API token for authentication
verify_ssl (bool): Whether to verify SSL certificates
"""
self.url = url
self.token = token
self.nb = pynetbox.api(self.url, token=self.token)
self.nb.http_session.verify = verify_ssl
def validate_input(self, **kwargs) -> bool:
"""
Validates input parameters.
Args:
**kwargs: Key-value pairs to validate
Returns:
bool: True if all inputs are valid
Raises:
ValueError: If any input is invalid
"""
for key, value in kwargs.items():
if value is None or (isinstance(value, str) and not value.strip()):
raise ValueError(f"Invalid {key}: {value}")
return True
def check_connection(self) -> bool:
"""Verifies connection to NetBox instance"""
try:
self.nb.status()
return True
except Exception as e:
logger.error(f"Connection check failed: {e}")
return False
## ----------------------------------
## TENANTS MANAGEMENT
## ----------------------------------
@error_handler
def get_tenants(self) -> List:
"""Returns all tenants in NetBox."""
return list(self.nb.tenancy.tenants.all())
@error_handler
def create_tenant(self, name: str, slug: str):
"""
Creates a new tenant in NetBox.
Args:
name (str): Tenant name
slug (str): URL-friendly slug
"""
self.validate_input(name=name, slug=slug)
return self.nb.tenancy.tenants.create({"name": name, "slug": slug})
## ----------------------------------
## SITES MANAGEMENT
## ----------------------------------
@error_handler
def get_sites(self) -> List:
"""Returns all sites in NetBox."""
return list(self.nb.dcim.sites.all())
@error_handler
def create_site(self, name: str, slug: str, status: str = "active"):
"""
Creates a new site in NetBox.
Args:
name (str): Site name
slug (str): URL-friendly slug
status (str): Site status
"""
self.validate_input(name=name, slug=slug)
return self.nb.dcim.sites.create({
"name": name,
"slug": slug,
"status": status
})
## ----------------------------------
## DEVICE MANAGEMENT
## ----------------------------------
@error_handler
def get_device_type_by_slug(self, slug: str) -> Optional[Dict]:
"""Returns a device type by slug."""
self.validate_input(slug=slug)
return self.nb.dcim.device_types.get(slug=slug)
@error_handler
def get_device_role(self, slug: str) -> Optional[Dict]:
"""Returns a device role by slug."""
self.validate_input(slug=slug)
return self.nb.dcim.device_roles.get(slug=slug)
def device_exists(self, name: str) -> bool:
"""Checks if a device exists by name."""
return bool(self.nb.dcim.devices.get(name=name))
@error_handler
def get_device_by_name(self, name: str) -> Optional[Dict]:
"""Returns a device by name."""
self.validate_input(name=name)
return self.nb.dcim.devices.get(name=name)
@error_handler
def create_device(self, name: str, device_type_slug: str, role_id: int,
site_id: int, location_id: Optional[int] = None,
status: str = DeviceStatus.ACTIVE.value):
"""
Creates a device in NetBox if it doesn't already exist.
Args:
name (str): Device name
device_type_slug (str): Device type slug
role_id (int): Role ID
site_id (int): Site ID
location_id (Optional[int]): Location ID
status (str): Device status
Returns:
Device object if successful, None otherwise
"""
self.validate_input(name=name, device_type_slug=device_type_slug)
if self.device_exists(name):
return self.get_device_by_name(name)
device_type = self.get_device_type_by_slug(device_type_slug)
if not device_type:
logger.error(f"Device type '{device_type_slug}' not found")
return None
try:
device_data = {
"name": name,
"device_type": device_type.id,
"role": role_id,
"site": site_id,
"location": location_id,
"status": status
}
# Debug
logger.debug("Creating device with data:")
for key, value in device_data.items():
logger.debug(f"{key}: {value} (type: {type(value)})")
return self.nb.dcim.devices.create(device_data)
except (ValueError, AttributeError) as e:
logger.error(f"Error preparing device data: {str(e)}")
return None
except Exception as e:
logger.error(f"Error creating device: {str(e)}")
return None
## ----------------------------------
## INTERFACES & CABLING
## ----------------------------------
@error_handler
def get_or_create_interface(self, device_id: int, if_name: str,
if_type: str = InterfaceTypes.QSFPP.value):
"""
Retrieves or creates an interface on a given device.
Args:
device_id (int): Device ID
if_name (str): Interface name
if_type (str): Interface type
"""
self.validate_input(if_name=if_name)
intf = self.nb.dcim.interfaces.get(device_id=device_id, name=if_name)
if intf:
return intf
return self.nb.dcim.interfaces.create({
"device": device_id,
"name": if_name,
"type": if_type,
})
@error_handler
def create_cable_if_not_exists(self, intf_a, intf_b):
"""
Creates a cable between two interfaces if it doesn't exist.
Args:
intf_a: First interface object
intf_b: Second interface object
"""
if not all([intf_a, intf_b]):
logger.warning("Missing interfaces to create cable")
return None
return self.nb.dcim.cables.create({
"a_terminations": [{"object_type": "dcim.interface", "object_id": intf_a.id}],
"b_terminations": [{"object_type": "dcim.interface", "object_id": intf_b.id}],
"status": "connected",
})
## ----------------------------------
## NETWORK MANAGEMENT
## ----------------------------------
@error_handler
def create_vlan(self, vlan_id: int, vlan_name: str, slug: str, tenant_id: str):
"""Creates a VLAN in NetBox."""
self.validate_input(vlan_name=vlan_name, slug=slug)
return self.nb.ipam.vlans.create({
"vid": vlan_id,
"name": vlan_name,
"slug": slug,
"tenant": tenant_id,
})
@error_handler
def create_l2vpn(self, vni_id: int, vpn_name: str, slug: str, tenant_id: str):
"""Creates an L2VPN in NetBox."""
self.validate_input(vpn_name=vpn_name, slug=slug)
return self.nb.vpn.l2vpns.create({
"name": vpn_name,
"slug": slug,
"type": "vxlan-evpn",
"tenant": tenant_id,
"identifier": vni_id
})
@error_handler
def create_vxlan_termination(self, l2vpn_id: int, assigned_object_type: str,
assigned_object_id: int):
"""Creates a VXLAN termination for L2VPN."""
return self.nb.vpn.l2vpn_terminations.create({
"l2vpn": l2vpn_id,
"assigned_object_type": assigned_object_type,
"assigned_object_id": assigned_object_id
})
@error_handler
def allocate_prefix(self, parent_prefix, prefix_length: int,
site_id: int, role_id: int, tenant_id: int):
"""
Allocates a child subnet from a parent prefix.
Args:
parent_prefix: Parent prefix object
prefix_length (int): Length of the child prefix
site_id (int): Site ID
role_id (int): Role ID
tenant_id (int): Tenant ID
"""
return parent_prefix.available_prefixes.create({
"prefix_length": prefix_length,
"site": site_id,
"role": role_id,
"tenant": tenant_id
})
@error_handler
def assign_ip_to_interface(self, interface, ip_address: str,
status: str = DeviceStatus.ACTIVE.value):
"""Assigns an IP address to an interface."""
self.validate_input(ip_address=ip_address)
return self.nb.ipam.ip_addresses.create({
"address": ip_address,
"assigned_object_id": interface.id,
"assigned_object_type": "dcim.interface",
"status": status,
})
@error_handler
def get_available_ips_in_prefix(self, prefix) -> List:
"""Fetches available IPs within a prefix."""
if not hasattr(prefix, "available_ips"):
logger.error(f"Invalid prefix object: {prefix}")
return []
return list(prefix.available_ips.list())
@error_handler
def save_custom_fields(self, device, fields: Dict[str, Any]) -> bool:
"""
Saves custom fields for a device.
Args:
device: Device object
fields (Dict[str, Any]): Custom fields to save
"""
for key, value in fields.items():
device.custom_fields[key] = value
device.save()
return True
@error_handler
def cleanup_unused_devices(self, older_than_days: int = 30):
"""
Removes devices that haven't been updated in specified days.
Args:
older_than_days (int): Number of days of inactivity
"""
cutoff = datetime.now() - timedelta(days=older_than_days)
devices = self.nb.dcim.devices.filter(last_updated__lt=cutoff)
for device in devices:
device.delete()