Python ile Veritabanı Yedekleme Otomasyonu

Veritabanı yedeklemesi konusunda “elle yapacağım” diyenler, er ya da geç bir gün o kritik yedeği almayı unuttuklarını fark ederler. Genellikle de en kötü zamanda. Python ile bu süreci otomatize etmek hem güvenilir hem de son derece esnek bir çözüm sunuyor. Bu yazıda sıfırdan başlayarak production ortamında kullanabileceğiniz bir yedekleme sistemi kuracağız.

Neden Python ile Yedekleme Otomasyonu?

Bash scriptleri yedekleme için yeterli gibi görünse de Python birkaç önemli avantaj sunuyor. Hata yönetimi çok daha güçlü, loglama mekanizması kurması kolay, birden fazla veritabanı tipini aynı script içinde yönetebiliyorsunuz ve e-posta/Slack bildirimleri entegre etmek çok daha az zahmetli.

Benim tercihim genellikle şu şekilde: basit tek seferlik işler için Bash, haftalar boyunca büyüyecek ve gelişecek otomasyon sistemleri için Python.

Ortamı Hazırlamak

Önce gerekli araçları kuralım. PostgreSQL ve MySQL yedeklemelerini ele alacağız.

# Ubuntu/Debian
sudo apt-get install postgresql-client mysql-client

# Python bağımlılıkları
pip install python-dotenv boto3 paramiko schedule

# Sanal ortam oluşturmanızı şiddetle öneririm
python3 -m venv /opt/db-backup/venv
source /opt/db-backup/venv/bin/activate
pip install python-dotenv boto3 schedule

Proje dizin yapısını da şimdiden düzenli tutmak işinizi kolaylaştırır:

mkdir -p /opt/db-backup/{logs,backups,config}
touch /opt/db-backup/config/.env
chmod 600 /opt/db-backup/config/.env

Yapılandırma Dosyası

Şifreler ve bağlantı bilgileri asla kod içinde olmamalı. .env dosyası kullanacağız:

# /opt/db-backup/config/.env

# PostgreSQL
PG_HOST=localhost
PG_PORT=5432
PG_USER=backup_user
PG_PASSWORD=guclu_bir_sifre
PG_DATABASES=myapp_db,analytics_db,users_db

# MySQL
MYSQL_HOST=localhost
MYSQL_PORT=3306
MYSQL_USER=backup_user
MYSQL_PASSWORD=baska_bir_guclu_sifre
MYSQL_DATABASES=wordpress_db,ecommerce_db

# Yedek ayarları
BACKUP_DIR=/opt/db-backup/backups
RETENTION_DAYS=30
COMPRESS=true

# AWS S3 (opsiyonel)
AWS_BUCKET=sirket-db-backuplari
AWS_REGION=eu-central-1

# Bildirim
SMTP_HOST=smtp.sirket.com
SMTP_PORT=587
[email protected]
SMTP_PASSWORD=smtp_sifresi
[email protected]

Ana Yedekleme Modülü

Şimdi asıl işi yapacak olan temel modülü yazalım. Bu yapıyı modüler tutmak önemli, çünkü ileride yeni veritabanı tipleri eklemek isteyeceksiniz:

# /opt/db-backup/backup_manager.py

import os
import subprocess
import gzip
import shutil
import logging
from datetime import datetime
from pathlib import Path
from dotenv import load_dotenv

# Yapılandırmayı yükle
load_dotenv('/opt/db-backup/config/.env')

# Loglama ayarları
log_dir = Path('/opt/db-backup/logs')
log_dir.mkdir(exist_ok=True)

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(log_dir / f'backup_{datetime.now().strftime("%Y%m")}.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)


class DatabaseBackup:
    def __init__(self):
        self.backup_dir = Path(os.getenv('BACKUP_DIR', '/opt/db-backup/backups'))
        self.backup_dir.mkdir(parents=True, exist_ok=True)
        self.compress = os.getenv('COMPRESS', 'true').lower() == 'true'
        self.timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        self.results = []

    def _create_backup_path(self, db_type, db_name):
        """Veritabanı tipine göre dizin yolu oluştur"""
        path = self.backup_dir / db_type / datetime.now().strftime('%Y/%m/%d')
        path.mkdir(parents=True, exist_ok=True)
        return path

    def _compress_file(self, filepath):
        """Dosyayı gzip ile sıkıştır"""
        gz_path = f"{filepath}.gz"
        with open(filepath, 'rb') as f_in:
            with gzip.open(gz_path, 'wb') as f_out:
                shutil.copyfileobj(f_in, f_out)
        os.remove(filepath)
        logger.info(f"Sıkıştırıldı: {gz_path}")
        return gz_path

    def backup_postgresql(self, host, port, user, password, db_name):
        """PostgreSQL veritabanını yedekle"""
        backup_path = self._create_backup_path('postgresql', db_name)
        filename = f"{db_name}_{self.timestamp}.sql"
        full_path = backup_path / filename

        env = os.environ.copy()
        env['PGPASSWORD'] = password

        cmd = [
            'pg_dump',
            '-h', host,
            '-p', str(port),
            '-U', user,
            '-d', db_name,
            '--no-password',
            '--verbose',
            '-f', str(full_path)
        ]

        logger.info(f"PostgreSQL yedekleme başlıyor: {db_name}")

        try:
            result = subprocess.run(
                cmd,
                env=env,
                capture_output=True,
                text=True,
                timeout=3600  # 1 saat timeout
            )

            if result.returncode != 0:
                raise Exception(f"pg_dump hatası: {result.stderr}")

            file_size = os.path.getsize(full_path)

            if self.compress:
                full_path = self._compress_file(str(full_path))

            logger.info(f"Başarılı: {db_name} - {file_size / 1024 / 1024:.2f} MB")
            self.results.append({
                'db': db_name,
                'type': 'postgresql',
                'status': 'success',
                'path': str(full_path),
                'size_mb': round(file_size / 1024 / 1024, 2)
            })
            return str(full_path)

        except Exception as e:
            logger.error(f"PostgreSQL yedekleme hatası ({db_name}): {str(e)}")
            self.results.append({
                'db': db_name,
                'type': 'postgresql',
                'status': 'failed',
                'error': str(e)
            })
            return None

    def backup_mysql(self, host, port, user, password, db_name):
        """MySQL veritabanını yedekle"""
        backup_path = self._create_backup_path('mysql', db_name)
        filename = f"{db_name}_{self.timestamp}.sql"
        full_path = backup_path / filename

        cmd = [
            'mysqldump',
            f'--host={host}',
            f'--port={port}',
            f'--user={user}',
            f'--password={password}',
            '--single-transaction',
            '--routines',
            '--triggers',
            '--events',
            db_name
        ]

        logger.info(f"MySQL yedekleme başlıyor: {db_name}")

        try:
            with open(full_path, 'w') as output_file:
                result = subprocess.run(
                    cmd,
                    stdout=output_file,
                    stderr=subprocess.PIPE,
                    text=True,
                    timeout=3600
                )

            if result.returncode != 0:
                raise Exception(f"mysqldump hatası: {result.stderr}")

            file_size = os.path.getsize(full_path)

            if self.compress:
                full_path = self._compress_file(str(full_path))

            logger.info(f"Başarılı: {db_name} - {file_size / 1024 / 1024:.2f} MB")
            self.results.append({
                'db': db_name,
                'type': 'mysql',
                'status': 'success',
                'path': str(full_path),
                'size_mb': round(file_size / 1024 / 1024, 2)
            })
            return str(full_path)

        except Exception as e:
            logger.error(f"MySQL yedekleme hatası ({db_name}): {str(e)}")
            self.results.append({
                'db': db_name,
                'type': 'mysql',
                'status': 'failed',
                'error': str(e)
            })
            return None

Yedek Temizleme (Retention Policy)

Yedeğin dolmasını engellemek için eski yedekleri düzenli silmek zorunlu. Özellikle büyük veritabanlarında disk bu konuda affetmez:

# retention_manager.py

import os
import time
import logging
from pathlib import Path
from datetime import datetime, timedelta

logger = logging.getLogger(__name__)


class RetentionManager:
    def __init__(self, backup_dir, retention_days=30):
        self.backup_dir = Path(backup_dir)
        self.retention_days = retention_days

    def cleanup_old_backups(self):
        """Belirtilen günden eski yedekleri sil"""
        cutoff_time = time.time() - (self.retention_days * 86400)
        deleted_count = 0
        freed_space = 0

        logger.info(f"Retention temizliği başlıyor: {self.retention_days} günden eski yedekler silinecek")

        for backup_file in self.backup_dir.rglob('*.sql*'):
            try:
                file_mtime = os.path.getmtime(backup_file)
                if file_mtime < cutoff_time:
                    file_size = os.path.getsize(backup_file)
                    backup_file.unlink()
                    deleted_count += 1
                    freed_space += file_size
                    logger.info(f"Silindi: {backup_file}")
            except Exception as e:
                logger.error(f"Silme hatası ({backup_file}): {str(e)}")

        # Boş dizinleri de temizle
        self._remove_empty_dirs()

        freed_mb = freed_space / 1024 / 1024
        logger.info(f"Temizlik tamamlandı: {deleted_count} dosya silindi, {freed_mb:.2f} MB alan açıldı")
        return deleted_count, freed_mb

    def _remove_empty_dirs(self):
        """Boş klasörleri temizle"""
        for dirpath in sorted(self.backup_dir.rglob('*'), reverse=True):
            if dirpath.is_dir():
                try:
                    dirpath.rmdir()  # Sadece boş dizinleri siler
                    logger.debug(f"Boş dizin silindi: {dirpath}")
                except OSError:
                    pass  # Dolu dizinleri atla

    def get_backup_stats(self):
        """Mevcut yedeklerin istatistiklerini döndür"""
        total_size = 0
        file_count = 0
        oldest_file = None
        newest_file = None

        for backup_file in self.backup_dir.rglob('*.sql*'):
            file_size = os.path.getsize(backup_file)
            file_mtime = os.path.getmtime(backup_file)
            total_size += file_size
            file_count += 1

            if oldest_file is None or file_mtime < oldest_file[1]:
                oldest_file = (backup_file, file_mtime)
            if newest_file is None or file_mtime > newest_file[1]:
                newest_file = (backup_file, file_mtime)

        return {
            'total_size_gb': round(total_size / 1024 / 1024 / 1024, 2),
            'file_count': file_count,
            'oldest': str(oldest_file[0]) if oldest_file else None,
            'newest': str(newest_file[0]) if newest_file else None
        }

AWS S3’e Yükleme

Yerel diskler güvenilir değildir. Sunucu çökerse yedeğinizin de gitmesi büyük bir risk. Offsite backup için S3 entegrasyonu ekleyelim:

# s3_uploader.py

import boto3
import logging
import os
from pathlib import Path
from botocore.exceptions import ClientError, NoCredentialsError

logger = logging.getLogger(__name__)


class S3Uploader:
    def __init__(self, bucket_name, region='eu-central-1'):
        self.bucket_name = bucket_name
        self.region = region
        try:
            self.s3_client = boto3.client('s3', region_name=region)
            logger.info(f"S3 bağlantısı kuruldu: {bucket_name}")
        except NoCredentialsError:
            logger.error("AWS kimlik bilgileri bulunamadı!")
            raise

    def upload_backup(self, local_path, s3_prefix='database-backups'):
        """Yerel yedek dosyasını S3'e yükle"""
        local_path = Path(local_path)
        if not local_path.exists():
            logger.error(f"Dosya bulunamadı: {local_path}")
            return False

        # S3 anahtarı: database-backups/2024/01/15/dosya.sql.gz
        from datetime import datetime
        date_path = datetime.now().strftime('%Y/%m/%d')
        s3_key = f"{s3_prefix}/{date_path}/{local_path.name}"

        file_size = os.path.getsize(local_path)
        logger.info(f"S3 yükleme başlıyor: {local_path.name} ({file_size / 1024 / 1024:.2f} MB)")

        try:
            # Büyük dosyalar için multipart upload otomatik devreye girer
            self.s3_client.upload_file(
                str(local_path),
                self.bucket_name,
                s3_key,
                ExtraArgs={
                    'StorageClass': 'STANDARD_IA',  # Daha ucuz, daha az erişilen için
                    'ServerSideEncryption': 'AES256'
                }
            )
            logger.info(f"S3 yükleme başarılı: s3://{self.bucket_name}/{s3_key}")
            return s3_key

        except ClientError as e:
            logger.error(f"S3 yükleme hatası: {str(e)}")
            return False

    def list_backups(self, prefix='database-backups'):
        """S3'teki mevcut yedekleri listele"""
        try:
            response = self.s3_client.list_objects_v2(
                Bucket=self.bucket_name,
                Prefix=prefix
            )
            backups = []
            for obj in response.get('Contents', []):
                backups.append({
                    'key': obj['Key'],
                    'size_mb': round(obj['Size'] / 1024 / 1024, 2),
                    'last_modified': obj['LastModified'].strftime('%Y-%m-%d %H:%M:%S')
                })
            return backups
        except ClientError as e:
            logger.error(f"S3 listeleme hatası: {str(e)}")
            return []

E-posta Bildirimleri

Yedekleme başarıyla tamamlandı mı, hata var mı? Bunu her sabah kontrol etmek yerine sistemin size haber vermesini sağlayın:

# notifier.py

import smtplib
import logging
import os
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime

logger = logging.getLogger(__name__)


class BackupNotifier:
    def __init__(self):
        self.smtp_host = os.getenv('SMTP_HOST')
        self.smtp_port = int(os.getenv('SMTP_PORT', 587))
        self.smtp_user = os.getenv('SMTP_USER')
        self.smtp_password = os.getenv('SMTP_PASSWORD')
        self.alert_email = os.getenv('ALERT_EMAIL')

    def send_report(self, results, stats=None):
        """Yedekleme sonuç raporu gönder"""
        if not all([self.smtp_host, self.smtp_user, self.alert_email]):
            logger.warning("SMTP ayarları eksik, e-posta gönderilemiyor")
            return

        success_count = sum(1 for r in results if r['status'] == 'success')
        fail_count = sum(1 for r in results if r['status'] == 'failed')
        has_failures = fail_count > 0

        subject = f"[{'HATA' if has_failures else 'OK'}] Veritabanı Yedekleme Raporu - {datetime.now().strftime('%Y-%m-%d')}"

        body_lines = [
            f"<h2>Yedekleme Raporu - {datetime.now().strftime('%Y-%m-%d %H:%M')}</h2>",
            f"<p><strong>Toplam:</strong> {len(results)} veritabanı | "
            f"<strong>Başarılı:</strong> {success_count} | "
            f"<strong>Başarısız:</strong> {fail_count}</p>",
            "<hr><h3>Detaylar:</h3><ul>"
        ]

        for r in results:
            if r['status'] == 'success':
                body_lines.append(
                    f"<li style='color:green'>✓ {r['db']} ({r['type']}) - {r.get('size_mb', 0)} MB</li>"
                )
            else:
                body_lines.append(
                    f"<li style='color:red'>✗ {r['db']} ({r['type']}) - HATA: {r.get('error', 'Bilinmeyen hata')}</li>"
                )

        body_lines.append("</ul>")

        if stats:
            body_lines.append(
                f"<hr><p><strong>Disk Kullanımı:</strong> {stats.get('total_size_gb', 0)} GB | "
                f"<strong>Toplam Dosya:</strong> {stats.get('file_count', 0)}</p>"
            )

        body = ''.join(body_lines)

        msg = MIMEMultipart('alternative')
        msg['Subject'] = subject
        msg['From'] = self.smtp_user
        msg['To'] = self.alert_email
        msg.attach(MIMEText(body, 'html', 'utf-8'))

        try:
            with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
                server.starttls()
                server.login(self.smtp_user, self.smtp_password)
                server.send_message(msg)
            logger.info(f"Rapor e-postası gönderildi: {self.alert_email}")
        except Exception as e:
            logger.error(f"E-posta gönderme hatası: {str(e)}")

Ana Orkestrasyon Scripti

Tüm parçaları bir araya getiren ana script:

# /opt/db-backup/run_backup.py

import os
import sys
import logging
from dotenv import load_dotenv
from backup_manager import DatabaseBackup
from retention_manager import RetentionManager
from s3_uploader import S3Uploader
from notifier import BackupNotifier

load_dotenv('/opt/db-backup/config/.env')
logger = logging.getLogger(__name__)


def main():
    logger.info("=" * 50)
    logger.info("Veritabanı yedekleme başlıyor")
    logger.info("=" * 50)

    backup = DatabaseBackup()
    backup_files = []

    # PostgreSQL yedekleri
    pg_databases = os.getenv('PG_DATABASES', '').split(',')
    for db_name in filter(None, pg_databases):
        filepath = backup.backup_postgresql(
            host=os.getenv('PG_HOST', 'localhost'),
            port=int(os.getenv('PG_PORT', 5432)),
            user=os.getenv('PG_USER'),
            password=os.getenv('PG_PASSWORD'),
            db_name=db_name.strip()
        )
        if filepath:
            backup_files.append(filepath)

    # MySQL yedekleri
    mysql_databases = os.getenv('MYSQL_DATABASES', '').split(',')
    for db_name in filter(None, mysql_databases):
        filepath = backup.backup_mysql(
            host=os.getenv('MYSQL_HOST', 'localhost'),
            port=int(os.getenv('MYSQL_PORT', 3306)),
            user=os.getenv('MYSQL_USER'),
            password=os.getenv('MYSQL_PASSWORD'),
            db_name=db_name.strip()
        )
        if filepath:
            backup_files.append(filepath)

    # S3'e yükle
    aws_bucket = os.getenv('AWS_BUCKET')
    if aws_bucket and backup_files:
        try:
            uploader = S3Uploader(aws_bucket, os.getenv('AWS_REGION', 'eu-central-1'))
            for filepath in backup_files:
                uploader.upload_backup(filepath)
        except Exception as e:
            logger.error(f"S3 yükleme genel hatası: {str(e)}")

    # Retention temizliği
    retention = RetentionManager(
        os.getenv('BACKUP_DIR'),
        int(os.getenv('RETENTION_DAYS', 30))
    )
    retention.cleanup_old_backups()
    stats = retention.get_backup_stats()

    # Rapor gönder
    notifier = BackupNotifier()
    notifier.send_report(backup.results, stats)

    # Başarısız yedek varsa çıkış kodunu 1 yap (monitoring için)
    failed = [r for r in backup.results if r['status'] == 'failed']
    if failed:
        logger.error(f"{len(failed)} yedekleme başarısız oldu!")
        sys.exit(1)

    logger.info("Tüm yedeklemeler başarıyla tamamlandı")
    sys.exit(0)


if __name__ == '__main__':
    main()

Cron ile Zamanlama

Scripti her gece otomatik çalıştırmak için crontab ayarlayalım:

# crontab -e ile açın
# Her gece 02:30'da çalıştır
30 2 * * * /opt/db-backup/venv/bin/python /opt/db-backup/run_backup.py >> /opt/db-backup/logs/cron.log 2>&1

# Haftada bir de Retention istatistiklerini logla
0 9 * * 1 /opt/db-backup/venv/bin/python -c "
from retention_manager import RetentionManager
import os
from dotenv import load_dotenv
load_dotenv('/opt/db-backup/config/.env')
rm = RetentionManager(os.getenv('BACKUP_DIR'))
print(rm.get_backup_stats())
"

# Script çalışabilir olduğundan emin ol
chmod +x /opt/db-backup/run_backup.py

# Manuel test çalıştır
/opt/db-backup/venv/bin/python /opt/db-backup/run_backup.py

Yedek Doğrulama

Yedek almak tek başına yetmez. O yedeğin açılabilir olduğunu doğrulamak da sisteminizin parçası olmalı:

# verify_backup.py

import gzip
import subprocess
import logging
import os
from pathlib import Path

logger = logging.getLogger(__name__)


def verify_gzip_integrity(filepath):
    """Gzip dosyasının bütünlüğünü kontrol et"""
    try:
        with gzip.open(filepath, 'rb') as f:
            # İlk ve son bloğu oku
            f.read(1024)
            f.seek(-1024, 2)
            f.read(1024)
        logger.info(f"Gzip bütünlük kontrolü geçti: {filepath}")
        return True
    except Exception as e:
        logger.error(f"Gzip bütünlük hatası ({filepath}): {str(e)}")
        return False


def verify_sql_content(filepath):
    """SQL dosyasının temel içeriğini doğrula"""
    try:
        if filepath.endswith('.gz'):
            import gzip
            opener = gzip.open
        else:
            opener = open

        with opener(filepath, 'rt', encoding='utf-8', errors='ignore') as f:
            content = f.read(4096)  # İlk 4KB yeterli

        # Temel SQL yapısını kontrol et
        checks = [
            ('CREATE TABLE' in content or 'INSERT INTO' in content or 'PostgreSQL' in content),
        ]

        if all(checks):
            logger.info(f"SQL içerik doğrulaması geçti: {filepath}")
            return True
        else:
            logger.warning(f"SQL içerik doğrulaması şüpheli: {filepath}")
            return False

    except Exception as e:
        logger.error(f"İçerik doğrulama hatası: {str(e)}")
        return False


def verify_latest_backups(backup_dir):
    """Son alınan yedekleri doğrula"""
    backup_path = Path(backup_dir)
    results = []

    for backup_file in sorted(backup_path.rglob('*.sql*'), key=os.path.getmtime, reverse=True)[:10]:
        is_valid_gzip = verify_gzip_integrity(str(backup_file)) if str(backup_file).endswith('.gz') else True
        is_valid_sql = verify_sql_content(str(backup_file))

        results.append({
            'file': backup_file.name,
            'gzip_ok': is_valid_gzip,
            'content_ok': is_valid_sql,
            'overall': is_valid_gzip and is_valid_sql
        })

    return results

Güvenlik İpuçları

Production ortamında dikkat etmeniz gereken birkaç kritik nokta var:

  • Backup kullanıcısı için minimum yetkiler tanımlayın. PostgreSQL’de pg_dump için sadece SELECT ve pg_read_all_data rolü yeterlidir
  • .env dosyasının izinleri mutlaka 600 olmalı, başka kullanıcılar okuyamasın
  • S3 bucket’ınızda versioning ve MFA delete aktif olsun, silinmeye karşı koruma sağlar
  • Şifreleme için S3 server-side encryption yetmez, kritik veriler için client-side encryption da değerlendirin
  • Yedekleme loglarını farklı bir lokasyonda saklayın, sunucu giderse loglar da gitmesin
# PostgreSQL backup kullanıcısı oluşturma
psql -U postgres -c "CREATE USER backup_user WITH PASSWORD 'guclu_sifre';"
psql -U postgres -c "GRANT pg_read_all_data TO backup_user;"

# MySQL backup kullanıcısı
mysql -u root -p -e "CREATE USER 'backup_user'@'localhost' IDENTIFIED BY 'guclu_sifre';"
mysql -u root -p -e "GRANT SELECT, LOCK TABLES, SHOW VIEW, EVENT, TRIGGER ON *.* TO 'backup_user'@'localhost';"
mysql -u root -p -e "FLUSH PRIVILEGES;"

Sonuç

Bu yazıda sıfırdan üretim kalitesinde bir Python yedekleme sistemi oluşturduk. Modüler yapısı sayesinde ilerleyen günlerde MongoDB, Redis gibi farklı veritabanlarını da kolayca ekleyebilirsiniz. Slack bildirimleri için notifier.py‘e birkaç satır eklemek yeterli. Grafana ile log analizi yapmak istiyorsanız JSON formatında loglama da eklenebilir.

En kritik nokta şu: yedek aldığınızı değil, yedeği geri yükleyebildiğinizi test edin. Ayda bir test ortamında bir yedeği restore etme alışkanlığı edinin. Birçok ekibin yedek olduğunu sandığı ama açılamayan dosyalarla karşılaştığını gördüm. Sistemi kurduktan sonra ilk işiniz restore testini yapmak olsun.

Yorum yapın