Files
prefect-flows-pg-backup/flows/backup.py
Damien Arnodo 509afbcbb4
Some checks failed
Deploy Prefect Flows / deploy (push) Has been cancelled
feat: load PostgreSQL password from Secret block instead of parameter
2026-02-01 10:15:53 +00:00

177 lines
4.7 KiB
Python

"""
PostgreSQL Backup Flow
Sauvegarde une base PostgreSQL vers S3 (Garage).
"""
import subprocess
from datetime import datetime
from pathlib import Path
from tempfile import TemporaryDirectory
from prefect import flow, task, get_run_logger
from prefect.blocks.system import Secret
from prefect_aws import AwsCredentials
import boto3
@task(name="pg_dump")
def dump_database(
host: str,
port: int,
database: str,
user: str,
password: str,
output_dir: Path,
) -> Path:
"""Exécute pg_dump et retourne le chemin du fichier."""
logger = get_run_logger()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{database}_{timestamp}.sql.gz"
output_path = output_dir / filename
logger.info(f"Dumping database {database} from {host}:{port}")
env = {
"PGPASSWORD": password,
"PATH": "/usr/bin:/bin",
}
# pg_dump avec compression gzip
cmd = [
"pg_dump",
"-h", host,
"-p", str(port),
"-U", user,
"-d", database,
"--format=plain",
"--no-owner",
"--no-acl",
]
with open(output_path, "wb") as f:
dump_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
)
gzip_process = subprocess.Popen(
["gzip", "-c"],
stdin=dump_process.stdout,
stdout=f,
stderr=subprocess.PIPE,
)
dump_process.stdout.close()
_, gzip_err = gzip_process.communicate()
_, dump_err = dump_process.communicate()
if dump_process.returncode != 0:
raise RuntimeError(f"pg_dump failed: {dump_err.decode()}")
if gzip_process.returncode != 0:
raise RuntimeError(f"gzip failed: {gzip_err.decode()}")
file_size = output_path.stat().st_size / (1024 * 1024)
logger.info(f"Dump completed: {filename} ({file_size:.2f} MB)")
return output_path
@task(name="upload_to_s3")
def upload_to_s3(
file_path: Path,
bucket: str,
prefix: str,
credentials: AwsCredentials,
) -> str:
"""Upload le fichier vers S3 et retourne l'URI."""
logger = get_run_logger()
s3_client = boto3.client(
"s3",
aws_access_key_id=credentials.aws_access_key_id,
aws_secret_access_key=credentials.aws_secret_access_key.get_secret_value(),
endpoint_url=credentials.aws_endpoint_url,
)
key = f"{prefix}/{file_path.name}" if prefix else file_path.name
logger.info(f"Uploading to s3://{bucket}/{key}")
s3_client.upload_file(str(file_path), bucket, key)
s3_uri = f"s3://{bucket}/{key}"
logger.info(f"Upload completed: {s3_uri}")
return s3_uri
@flow(name="pg-backup")
def pg_backup(
# PostgreSQL connection
pg_host: str,
pg_port: int = 5432,
pg_database: str = "postgres",
pg_user: str = "postgres",
# S3 destination
s3_bucket: str = "postgres-backup",
s3_prefix: str = "default",
# Prefect Block names
aws_credentials_block: str = "garage-credentials",
pg_password_block: str = "netbox-db-password",
) -> str:
"""
Flow principal de backup PostgreSQL vers S3.
Args:
pg_host: Hôte PostgreSQL
pg_port: Port PostgreSQL
pg_database: Nom de la base à sauvegarder
pg_user: Utilisateur PostgreSQL
s3_bucket: Bucket S3 de destination
s3_prefix: Préfixe (dossier) dans le bucket
aws_credentials_block: Nom du block Prefect contenant les credentials AWS/S3
pg_password_block: Nom du block Secret contenant le mot de passe PostgreSQL
Returns:
URI S3 du backup
"""
logger = get_run_logger()
logger.info(f"Starting backup of {pg_database}")
# Charger les credentials depuis les blocks Prefect
credentials = AwsCredentials.load(aws_credentials_block)
pg_password = Secret.load(pg_password_block).get()
with TemporaryDirectory() as tmpdir:
# Dump de la base
dump_path = dump_database(
host=pg_host,
port=pg_port,
database=pg_database,
user=pg_user,
password=pg_password,
output_dir=Path(tmpdir),
)
# Upload vers S3
s3_uri = upload_to_s3(
file_path=dump_path,
bucket=s3_bucket,
prefix=s3_prefix,
credentials=credentials,
)
logger.info(f"Backup completed: {s3_uri}")
return s3_uri
if __name__ == "__main__":
# Test local
pg_backup(
pg_host="postgresql.taila5ad8.ts.net",
pg_database="netbox",
pg_user="netbox",
)