feat: add PostgreSQL backup flow

This commit is contained in:
2026-01-31 15:42:32 +00:00
parent c88ed71e6b
commit 8e94efa3e6

175
flows/backup.py Normal file
View File

@@ -0,0 +1,175 @@
"""
PostgreSQL Backup Flow
Sauvegarde une base PostgreSQL vers S3 (Garage).
"""
import subprocess
from datetime import datetime
from pathlib import Path
from tempfile import TemporaryDirectory
from prefect import flow, task, get_run_logger
from prefect_aws import AwsCredentials
import boto3
@task(name="pg_dump")
def dump_database(
host: str,
port: int,
database: str,
user: str,
password: str,
output_dir: Path,
) -> Path:
"""Exécute pg_dump et retourne le chemin du fichier."""
logger = get_run_logger()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{database}_{timestamp}.sql.gz"
output_path = output_dir / filename
logger.info(f"Dumping database {database} from {host}:{port}")
env = {
"PGPASSWORD": password,
"PATH": "/usr/bin:/bin",
}
# pg_dump avec compression gzip
cmd = [
"pg_dump",
"-h", host,
"-p", str(port),
"-U", user,
"-d", database,
"--format=plain",
"--no-owner",
"--no-acl",
]
with open(output_path, "wb") as f:
dump_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
)
gzip_process = subprocess.Popen(
["gzip", "-c"],
stdin=dump_process.stdout,
stdout=f,
stderr=subprocess.PIPE,
)
dump_process.stdout.close()
_, gzip_err = gzip_process.communicate()
_, dump_err = dump_process.communicate()
if dump_process.returncode != 0:
raise RuntimeError(f"pg_dump failed: {dump_err.decode()}")
if gzip_process.returncode != 0:
raise RuntimeError(f"gzip failed: {gzip_err.decode()}")
file_size = output_path.stat().st_size / (1024 * 1024)
logger.info(f"Dump completed: {filename} ({file_size:.2f} MB)")
return output_path
@task(name="upload_to_s3")
def upload_to_s3(
file_path: Path,
bucket: str,
prefix: str,
credentials: AwsCredentials,
) -> str:
"""Upload le fichier vers S3 et retourne l'URI."""
logger = get_run_logger()
s3_client = boto3.client(
"s3",
aws_access_key_id=credentials.aws_access_key_id,
aws_secret_access_key=credentials.aws_secret_access_key.get_secret_value(),
endpoint_url=credentials.aws_endpoint_url,
)
key = f"{prefix}/{file_path.name}" if prefix else file_path.name
logger.info(f"Uploading to s3://{bucket}/{key}")
s3_client.upload_file(str(file_path), bucket, key)
s3_uri = f"s3://{bucket}/{key}"
logger.info(f"Upload completed: {s3_uri}")
return s3_uri
@flow(name="pg-backup")
def pg_backup(
# PostgreSQL connection
pg_host: str,
pg_port: int = 5432,
pg_database: str = "postgres",
pg_user: str = "postgres",
pg_password: str = "",
# S3 destination
s3_bucket: str = "backups",
s3_prefix: str = "postgresql",
# Prefect Block name for AWS credentials
aws_credentials_block: str = "garage-credentials",
) -> str:
"""
Flow principal de backup PostgreSQL vers S3.
Args:
pg_host: Hôte PostgreSQL
pg_port: Port PostgreSQL
pg_database: Nom de la base à sauvegarder
pg_user: Utilisateur PostgreSQL
pg_password: Mot de passe PostgreSQL
s3_bucket: Bucket S3 de destination
s3_prefix: Préfixe (dossier) dans le bucket
aws_credentials_block: Nom du block Prefect contenant les credentials AWS/S3
Returns:
URI S3 du backup
"""
logger = get_run_logger()
logger.info(f"Starting backup of {pg_database}")
# Charger les credentials S3 depuis le block Prefect
credentials = AwsCredentials.load(aws_credentials_block)
with TemporaryDirectory() as tmpdir:
# Dump de la base
dump_path = dump_database(
host=pg_host,
port=pg_port,
database=pg_database,
user=pg_user,
password=pg_password,
output_dir=Path(tmpdir),
)
# Upload vers S3
s3_uri = upload_to_s3(
file_path=dump_path,
bucket=s3_bucket,
prefix=s3_prefix,
credentials=credentials,
)
logger.info(f"Backup completed: {s3_uri}")
return s3_uri
if __name__ == "__main__":
# Test local
pg_backup(
pg_host="postgresql.taila5ad8.ts.net",
pg_database="netbox",
pg_user="netbox",
pg_password="test",
)