From 8e94efa3e6d694303311c090ec41ad17332ebd14 Mon Sep 17 00:00:00 2001 From: Damien Arnodo Date: Sat, 31 Jan 2026 15:42:32 +0000 Subject: [PATCH] feat: add PostgreSQL backup flow --- flows/backup.py | 175 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 175 insertions(+) create mode 100644 flows/backup.py diff --git a/flows/backup.py b/flows/backup.py new file mode 100644 index 0000000..950eb4a --- /dev/null +++ b/flows/backup.py @@ -0,0 +1,175 @@ +""" +PostgreSQL Backup Flow + +Sauvegarde une base PostgreSQL vers S3 (Garage). +""" + +import subprocess +from datetime import datetime +from pathlib import Path +from tempfile import TemporaryDirectory + +from prefect import flow, task, get_run_logger +from prefect_aws import AwsCredentials +import boto3 + + +@task(name="pg_dump") +def dump_database( + host: str, + port: int, + database: str, + user: str, + password: str, + output_dir: Path, +) -> Path: + """Exécute pg_dump et retourne le chemin du fichier.""" + logger = get_run_logger() + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"{database}_{timestamp}.sql.gz" + output_path = output_dir / filename + + logger.info(f"Dumping database {database} from {host}:{port}") + + env = { + "PGPASSWORD": password, + "PATH": "/usr/bin:/bin", + } + + # pg_dump avec compression gzip + cmd = [ + "pg_dump", + "-h", host, + "-p", str(port), + "-U", user, + "-d", database, + "--format=plain", + "--no-owner", + "--no-acl", + ] + + with open(output_path, "wb") as f: + dump_process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env, + ) + gzip_process = subprocess.Popen( + ["gzip", "-c"], + stdin=dump_process.stdout, + stdout=f, + stderr=subprocess.PIPE, + ) + dump_process.stdout.close() + _, gzip_err = gzip_process.communicate() + _, dump_err = dump_process.communicate() + + if dump_process.returncode != 0: + raise RuntimeError(f"pg_dump failed: {dump_err.decode()}") + if gzip_process.returncode != 0: + raise RuntimeError(f"gzip failed: {gzip_err.decode()}") + + file_size = output_path.stat().st_size / (1024 * 1024) + logger.info(f"Dump completed: {filename} ({file_size:.2f} MB)") + + return output_path + + +@task(name="upload_to_s3") +def upload_to_s3( + file_path: Path, + bucket: str, + prefix: str, + credentials: AwsCredentials, +) -> str: + """Upload le fichier vers S3 et retourne l'URI.""" + logger = get_run_logger() + + s3_client = boto3.client( + "s3", + aws_access_key_id=credentials.aws_access_key_id, + aws_secret_access_key=credentials.aws_secret_access_key.get_secret_value(), + endpoint_url=credentials.aws_endpoint_url, + ) + + key = f"{prefix}/{file_path.name}" if prefix else file_path.name + + logger.info(f"Uploading to s3://{bucket}/{key}") + + s3_client.upload_file(str(file_path), bucket, key) + + s3_uri = f"s3://{bucket}/{key}" + logger.info(f"Upload completed: {s3_uri}") + + return s3_uri + + +@flow(name="pg-backup") +def pg_backup( + # PostgreSQL connection + pg_host: str, + pg_port: int = 5432, + pg_database: str = "postgres", + pg_user: str = "postgres", + pg_password: str = "", + # S3 destination + s3_bucket: str = "backups", + s3_prefix: str = "postgresql", + # Prefect Block name for AWS credentials + aws_credentials_block: str = "garage-credentials", +) -> str: + """ + Flow principal de backup PostgreSQL vers S3. + + Args: + pg_host: Hôte PostgreSQL + pg_port: Port PostgreSQL + pg_database: Nom de la base à sauvegarder + pg_user: Utilisateur PostgreSQL + pg_password: Mot de passe PostgreSQL + s3_bucket: Bucket S3 de destination + s3_prefix: Préfixe (dossier) dans le bucket + aws_credentials_block: Nom du block Prefect contenant les credentials AWS/S3 + + Returns: + URI S3 du backup + """ + logger = get_run_logger() + logger.info(f"Starting backup of {pg_database}") + + # Charger les credentials S3 depuis le block Prefect + credentials = AwsCredentials.load(aws_credentials_block) + + with TemporaryDirectory() as tmpdir: + # Dump de la base + dump_path = dump_database( + host=pg_host, + port=pg_port, + database=pg_database, + user=pg_user, + password=pg_password, + output_dir=Path(tmpdir), + ) + + # Upload vers S3 + s3_uri = upload_to_s3( + file_path=dump_path, + bucket=s3_bucket, + prefix=s3_prefix, + credentials=credentials, + ) + + logger.info(f"Backup completed: {s3_uri}") + return s3_uri + + +if __name__ == "__main__": + # Test local + pg_backup( + pg_host="postgresql.taila5ad8.ts.net", + pg_database="netbox", + pg_user="netbox", + pg_password="test", + )