""" PostgreSQL Backup Flow Sauvegarde une base PostgreSQL vers S3 (Garage). """ import subprocess from datetime import datetime from pathlib import Path from tempfile import TemporaryDirectory from prefect import flow, task, get_run_logger from prefect.blocks.system import Secret from prefect_aws import AwsCredentials @task(name="pg_dump") def dump_database( host: str, port: int, database: str, user: str, password: str, output_dir: Path, ) -> Path: """Exécute pg_dump et retourne le chemin du fichier.""" logger = get_run_logger() timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{database}_{timestamp}.sql.gz" output_path = output_dir / filename logger.info(f"Dumping database {database} from {host}:{port}") env = { "PGPASSWORD": password, "PATH": "/usr/bin:/bin", } # pg_dump avec compression gzip cmd = [ "pg_dump", "-h", host, "-p", str(port), "-U", user, "-d", database, "--format=plain", "--no-owner", "--no-acl", ] with open(output_path, "wb") as f: dump_process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, ) gzip_process = subprocess.Popen( ["gzip", "-c"], stdin=dump_process.stdout, stdout=f, stderr=subprocess.PIPE, ) dump_process.stdout.close() _, gzip_err = gzip_process.communicate() _, dump_err = dump_process.communicate() if dump_process.returncode != 0: raise RuntimeError(f"pg_dump failed: {dump_err.decode()}") if gzip_process.returncode != 0: raise RuntimeError(f"gzip failed: {gzip_err.decode()}") file_size = output_path.stat().st_size / (1024 * 1024) logger.info(f"Dump completed: {filename} ({file_size:.2f} MB)") return output_path @task(name="upload_to_s3") def upload_to_s3( file_path: Path, bucket: str, prefix: str, credentials: AwsCredentials, ) -> str: """Upload le fichier vers S3 et retourne l'URI.""" logger = get_run_logger() # Utiliser get_s3_client() qui gère automatiquement l'endpoint s3_client = credentials.get_s3_client() key = f"{prefix}/{file_path.name}" if prefix else file_path.name logger.info(f"Uploading to s3://{bucket}/{key}") s3_client.upload_file(str(file_path), bucket, key) s3_uri = f"s3://{bucket}/{key}" logger.info(f"Upload completed: {s3_uri}") return s3_uri @flow(name="pg-backup") def pg_backup( # PostgreSQL connection pg_host: str, pg_port: int = 5432, pg_database: str = "postgres", pg_user: str = "postgres", # S3 destination s3_bucket: str = "postgres-backup", s3_prefix: str = "default", # Prefect Block names aws_credentials_block: str = "garage-credentials", pg_password_block: str = "netbox-db-password", ) -> str: """ Flow principal de backup PostgreSQL vers S3. Args: pg_host: Hôte PostgreSQL pg_port: Port PostgreSQL pg_database: Nom de la base à sauvegarder pg_user: Utilisateur PostgreSQL s3_bucket: Bucket S3 de destination s3_prefix: Préfixe (dossier) dans le bucket aws_credentials_block: Nom du block Prefect contenant les credentials AWS/S3 pg_password_block: Nom du block Secret contenant le mot de passe PostgreSQL Returns: URI S3 du backup """ logger = get_run_logger() logger.info(f"Starting backup of {pg_database}") # Charger les credentials depuis les blocks Prefect credentials = AwsCredentials.load(aws_credentials_block) pg_password = Secret.load(pg_password_block).get() with TemporaryDirectory() as tmpdir: # Dump de la base dump_path = dump_database( host=pg_host, port=pg_port, database=pg_database, user=pg_user, password=pg_password, output_dir=Path(tmpdir), ) # Upload vers S3 s3_uri = upload_to_s3( file_path=dump_path, bucket=s3_bucket, prefix=s3_prefix, credentials=credentials, ) logger.info(f"Backup completed: {s3_uri}") return s3_uri if __name__ == "__main__": # Test local pg_backup( pg_host="postgresql.taila5ad8.ts.net", pg_database="netbox", pg_user="netbox", )