* fix(template) : Change `Ethernet3`configuration * fix(Template): Missing Underlay configuration * Fix(template) : error 'ipam.models.ip.IPAddress DoesNotExist * fix(routing) : ! IP routing not enabled * fix(leaves template): adding ip routing and multi-agent model
356 lines
11 KiB
Python
356 lines
11 KiB
Python
"""
|
|
NetBox_backend.py
|
|
=================
|
|
A Python class to interact with NetBox using pynetbox.
|
|
"""
|
|
|
|
import logging
|
|
import pynetbox
|
|
from datetime import datetime, timedelta
|
|
from enum import Enum
|
|
from typing import Optional, List, Dict, Any
|
|
from functools import wraps
|
|
|
|
# Configuration du logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class InterfaceTypes(Enum):
|
|
QSFPP = "40gbase-x-qsfpp"
|
|
SFP = "1000base-x-sfp"
|
|
SFP_PLUS = "10gbase-x-sfpp"
|
|
QSFP28 = "100gbase-x-qsfp28"
|
|
|
|
class DeviceStatus(Enum):
|
|
ACTIVE = "active"
|
|
PLANNED = "planned"
|
|
OFFLINE = "offline"
|
|
FAILED = "failed"
|
|
|
|
def error_handler(func):
|
|
"""Décorateur pour la gestion uniforme des erreurs"""
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
try:
|
|
return func(*args, **kwargs)
|
|
except Exception as e:
|
|
logger.error(f"{func.__name__} failed: {str(e)}")
|
|
return None
|
|
return wrapper
|
|
|
|
class NetBoxBackend:
|
|
def __init__(self, url: str, token: str, verify_ssl: bool = True):
|
|
"""
|
|
Initializes the NetBox API connection.
|
|
|
|
Args:
|
|
url (str): NetBox instance URL
|
|
token (str): API token for authentication
|
|
verify_ssl (bool): Whether to verify SSL certificates
|
|
"""
|
|
self.url = url
|
|
self.token = token
|
|
self.nb = pynetbox.api(self.url, token=self.token)
|
|
self.nb.http_session.verify = verify_ssl
|
|
|
|
def validate_input(self, **kwargs) -> bool:
|
|
"""
|
|
Validates input parameters.
|
|
|
|
Args:
|
|
**kwargs: Key-value pairs to validate
|
|
|
|
Returns:
|
|
bool: True if all inputs are valid
|
|
|
|
Raises:
|
|
ValueError: If any input is invalid
|
|
"""
|
|
for key, value in kwargs.items():
|
|
if value is None or (isinstance(value, str) and not value.strip()):
|
|
raise ValueError(f"Invalid {key}: {value}")
|
|
return True
|
|
|
|
def check_connection(self) -> bool:
|
|
"""Verifies connection to NetBox instance"""
|
|
try:
|
|
self.nb.status()
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Connection check failed: {e}")
|
|
return False
|
|
|
|
## ----------------------------------
|
|
## TENANTS MANAGEMENT
|
|
## ----------------------------------
|
|
|
|
@error_handler
|
|
def get_tenants(self) -> List:
|
|
"""Returns all tenants in NetBox."""
|
|
return list(self.nb.tenancy.tenants.all())
|
|
|
|
@error_handler
|
|
def create_tenant(self, name: str, slug: str):
|
|
"""
|
|
Creates a new tenant in NetBox.
|
|
|
|
Args:
|
|
name (str): Tenant name
|
|
slug (str): URL-friendly slug
|
|
"""
|
|
self.validate_input(name=name, slug=slug)
|
|
return self.nb.tenancy.tenants.create({"name": name, "slug": slug})
|
|
|
|
## ----------------------------------
|
|
## SITES MANAGEMENT
|
|
## ----------------------------------
|
|
|
|
@error_handler
|
|
def get_sites(self) -> List:
|
|
"""Returns all sites in NetBox."""
|
|
return list(self.nb.dcim.sites.all())
|
|
|
|
@error_handler
|
|
def create_site(self, name: str, slug: str, status: str = "active"):
|
|
"""
|
|
Creates a new site in NetBox.
|
|
|
|
Args:
|
|
name (str): Site name
|
|
slug (str): URL-friendly slug
|
|
status (str): Site status
|
|
"""
|
|
self.validate_input(name=name, slug=slug)
|
|
return self.nb.dcim.sites.create({
|
|
"name": name,
|
|
"slug": slug,
|
|
"status": status
|
|
})
|
|
|
|
## ----------------------------------
|
|
## DEVICE MANAGEMENT
|
|
## ----------------------------------
|
|
|
|
@error_handler
|
|
def get_device_type_by_slug(self, slug: str) -> Optional[Dict]:
|
|
"""Returns a device type by slug."""
|
|
self.validate_input(slug=slug)
|
|
return self.nb.dcim.device_types.get(slug=slug)
|
|
|
|
@error_handler
|
|
def get_device_role(self, slug: str) -> Optional[Dict]:
|
|
"""Returns a device role by slug."""
|
|
self.validate_input(slug=slug)
|
|
return self.nb.dcim.device_roles.get(slug=slug)
|
|
|
|
def device_exists(self, name: str) -> bool:
|
|
"""Checks if a device exists by name."""
|
|
return bool(self.nb.dcim.devices.get(name=name))
|
|
|
|
@error_handler
|
|
def get_device_by_name(self, name: str) -> Optional[Dict]:
|
|
"""Returns a device by name."""
|
|
self.validate_input(name=name)
|
|
return self.nb.dcim.devices.get(name=name)
|
|
|
|
@error_handler
|
|
def create_device(self, name: str, device_type_slug: str, role_id: int,
|
|
site_id: int, location_id: Optional[int] = None,
|
|
status: str = DeviceStatus.ACTIVE.value):
|
|
"""
|
|
Creates a device in NetBox if it doesn't already exist.
|
|
|
|
Args:
|
|
name (str): Device name
|
|
device_type_slug (str): Device type slug
|
|
role_id (int): Role ID
|
|
site_id (int): Site ID
|
|
location_id (Optional[int]): Location ID
|
|
status (str): Device status
|
|
|
|
Returns:
|
|
Device object if successful, None otherwise
|
|
"""
|
|
self.validate_input(name=name, device_type_slug=device_type_slug)
|
|
|
|
if self.device_exists(name):
|
|
return self.get_device_by_name(name)
|
|
|
|
device_type = self.get_device_type_by_slug(device_type_slug)
|
|
if not device_type:
|
|
logger.error(f"Device type '{device_type_slug}' not found")
|
|
return None
|
|
|
|
try:
|
|
device_data = {
|
|
"name": name,
|
|
"device_type": device_type.id,
|
|
"role": role_id,
|
|
"site": site_id,
|
|
"location": location_id,
|
|
"status": status
|
|
}
|
|
|
|
# Debug
|
|
logger.debug("Creating device with data:")
|
|
for key, value in device_data.items():
|
|
logger.debug(f"{key}: {value} (type: {type(value)})")
|
|
|
|
return self.nb.dcim.devices.create(device_data)
|
|
|
|
except (ValueError, AttributeError) as e:
|
|
logger.error(f"Error preparing device data: {str(e)}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error creating device: {str(e)}")
|
|
return None
|
|
|
|
## ----------------------------------
|
|
## INTERFACES & CABLING
|
|
## ----------------------------------
|
|
|
|
@error_handler
|
|
def get_or_create_interface(self, device_id: int, if_name: str,
|
|
if_type: str = InterfaceTypes.QSFPP.value):
|
|
"""
|
|
Retrieves or creates an interface on a given device.
|
|
|
|
Args:
|
|
device_id (int): Device ID
|
|
if_name (str): Interface name
|
|
if_type (str): Interface type
|
|
"""
|
|
self.validate_input(if_name=if_name)
|
|
|
|
intf = self.nb.dcim.interfaces.get(device_id=device_id, name=if_name)
|
|
if intf:
|
|
return intf
|
|
|
|
return self.nb.dcim.interfaces.create({
|
|
"device": device_id,
|
|
"name": if_name,
|
|
"type": if_type,
|
|
})
|
|
|
|
@error_handler
|
|
def create_cable_if_not_exists(self, intf_a, intf_b):
|
|
"""
|
|
Creates a cable between two interfaces if it doesn't exist.
|
|
|
|
Args:
|
|
intf_a: First interface object
|
|
intf_b: Second interface object
|
|
"""
|
|
if not all([intf_a, intf_b]):
|
|
logger.warning("Missing interfaces to create cable")
|
|
return None
|
|
|
|
return self.nb.dcim.cables.create({
|
|
"a_terminations": [{"object_type": "dcim.interface", "object_id": intf_a.id}],
|
|
"b_terminations": [{"object_type": "dcim.interface", "object_id": intf_b.id}],
|
|
"status": "connected",
|
|
})
|
|
|
|
## ----------------------------------
|
|
## NETWORK MANAGEMENT
|
|
## ----------------------------------
|
|
|
|
@error_handler
|
|
def create_vlan(self, vlan_id: int, vlan_name: str, slug: str, tenant_id: str):
|
|
"""Creates a VLAN in NetBox."""
|
|
self.validate_input(vlan_name=vlan_name, slug=slug)
|
|
return self.nb.ipam.vlans.create({
|
|
"vid": vlan_id,
|
|
"name": vlan_name,
|
|
"slug": slug,
|
|
"tenant": tenant_id,
|
|
})
|
|
|
|
@error_handler
|
|
def create_l2vpn(self, vni_id: int, vpn_name: str, slug: str, tenant_id: str):
|
|
"""Creates an L2VPN in NetBox."""
|
|
self.validate_input(vpn_name=vpn_name, slug=slug)
|
|
return self.nb.vpn.l2vpns.create({
|
|
"name": vpn_name,
|
|
"slug": slug,
|
|
"type": "vxlan-evpn",
|
|
"tenant": tenant_id,
|
|
"identifier": vni_id
|
|
})
|
|
|
|
@error_handler
|
|
def create_vxlan_termination(self, l2vpn_id: int, assigned_object_type: str,
|
|
assigned_object_id: int):
|
|
"""Creates a VXLAN termination for L2VPN."""
|
|
return self.nb.vpn.l2vpn_terminations.create({
|
|
"l2vpn": l2vpn_id,
|
|
"assigned_object_type": assigned_object_type,
|
|
"assigned_object_id": assigned_object_id
|
|
})
|
|
|
|
@error_handler
|
|
def allocate_prefix(self, parent_prefix, prefix_length: int,
|
|
site_id: int, role_id: int, tenant_id: int):
|
|
"""
|
|
Allocates a child subnet from a parent prefix.
|
|
|
|
Args:
|
|
parent_prefix: Parent prefix object
|
|
prefix_length (int): Length of the child prefix
|
|
site_id (int): Site ID
|
|
role_id (int): Role ID
|
|
tenant_id (int): Tenant ID
|
|
"""
|
|
return parent_prefix.available_prefixes.create({
|
|
"prefix_length": prefix_length,
|
|
"site": site_id,
|
|
"role": role_id,
|
|
"tenant": tenant_id
|
|
})
|
|
|
|
@error_handler
|
|
def assign_ip_to_interface(self, interface, ip_address: str,
|
|
status: str = DeviceStatus.ACTIVE.value):
|
|
"""Assigns an IP address to an interface."""
|
|
self.validate_input(ip_address=ip_address)
|
|
return self.nb.ipam.ip_addresses.create({
|
|
"address": ip_address,
|
|
"assigned_object_id": interface.id,
|
|
"assigned_object_type": "dcim.interface",
|
|
"status": status,
|
|
})
|
|
|
|
@error_handler
|
|
def get_available_ips_in_prefix(self, prefix) -> List:
|
|
"""Fetches available IPs within a prefix."""
|
|
if not hasattr(prefix, "available_ips"):
|
|
logger.error(f"Invalid prefix object: {prefix}")
|
|
return []
|
|
return list(prefix.available_ips.list())
|
|
|
|
@error_handler
|
|
def save_custom_fields(self, device, fields: Dict[str, Any]) -> bool:
|
|
"""
|
|
Saves custom fields for a device.
|
|
|
|
Args:
|
|
device: Device object
|
|
fields (Dict[str, Any]): Custom fields to save
|
|
"""
|
|
for key, value in fields.items():
|
|
device.custom_fields[key] = value
|
|
device.save()
|
|
return True
|
|
|
|
@error_handler
|
|
def cleanup_unused_devices(self, older_than_days: int = 30):
|
|
"""
|
|
Removes devices that haven't been updated in specified days.
|
|
|
|
Args:
|
|
older_than_days (int): Number of days of inactivity
|
|
"""
|
|
cutoff = datetime.now() - timedelta(days=older_than_days)
|
|
devices = self.nb.dcim.devices.filter(last_updated__lt=cutoff)
|
|
for device in devices:
|
|
device.delete() |