Python ile Disk Kullanımı İzleme ve Uyarı Sistemi

Disk doldu uyarısını gecenin üçünde bir kullanıcıdan öğrenmek, her sistem yöneticisinin kabusudur. Oysa bu problemi önceden tespit etmek ve otomatik olarak haberdar olmak son derece mümkün. Python’un psutil kütüphanesi ve birkaç satır kod ile hem disk kullanımını izleyen hem de kritik eşiklerde sizi uyaran sağlam bir sistem kurabilirsiniz. Bu yazıda sıfırdan başlayarak production ortamında kullanabileceğiniz bir disk izleme ve uyarı sistemi geliştireceğiz.

Neden Python ile Disk İzleme?

Evet, Nagios, Zabbix veya Prometheus gibi araçlar zaten disk izleme yapıyor. Ama bazen şöyle durumlar olur: Küçük bir VPS’iniz var, enterprise bir monitoring aracı kurmak için ne bütçeniz ne de zamanınız var. Ya da mevcut monitoring sisteminizin dışında kalan özel bir klasörü izlemek istiyorsunuz. İşte bu gibi durumlarda Python ile kendi aracınızı yazmak hem hızlı hem de esnek bir çözüm sunuyor.

Python’un avantajları bu iş için gerçekten belirgin:

  • Çapraz platform desteği: Aynı script Linux, Windows ve macOS’ta çalışır
  • Zengin kütüphane ekosistemi: psutil, smtplib, logging gibi hazır araçlar
  • Kolay entegrasyon: Slack, PagerDuty, Telegram gibi servislere bağlanmak çok basit
  • Düşük kaynak tüketimi: Hafif bir script olarak arka planda rahatça çalışır

Kurulum ve Gereksinimler

Önce gerekli kütüphaneyi kuralım. Python standard library’nin dışında sadece psutil‘e ihtiyacımız var:

pip install psutil

# Sanal ortam kullanıyorsanız (önerilen):
python3 -m venv disk_monitor_env
source disk_monitor_env/bin/activate
pip install psutil requests  # requests Slack/webhook için

Python 3.6 ve üzeri bir sürüm kullandığınızdan emin olun. Çoğu modern Linux dağıtımında Python 3 zaten kurulu geliyor ama python3 --version ile kontrol etmek her zaman iyi bir alışkanlık.

Temel Disk Kullanımı Sorgusu

Her şeyin temeli olan basit bir disk kullanım sorgusundan başlayalım:

cat << 'EOF' > basic_disk_check.py
import psutil
import os

def get_disk_usage(path='/'):
    """Belirtilen dizin için disk kullanım bilgilerini döndürür."""
    usage = psutil.disk_usage(path)
    
    total_gb = usage.total / (1024 ** 3)
    used_gb = usage.used / (1024 ** 3)
    free_gb = usage.free / (1024 ** 3)
    percent = usage.percent
    
    return {
        'path': path,
        'total_gb': round(total_gb, 2),
        'used_gb': round(used_gb, 2),
        'free_gb': round(free_gb, 2),
        'percent': percent
    }

def get_all_partitions():
    """Sistemdeki tüm disk bölümlerini listeler."""
    partitions = psutil.disk_partitions()
    results = []
    
    for partition in partitions:
        try:
            usage = get_disk_usage(partition.mountpoint)
            usage['device'] = partition.device
            usage['fstype'] = partition.fstype
            results.append(usage)
        except PermissionError:
            # Bazı özel dosya sistemlerine erişim reddedilebilir
            continue
    
    return results

if __name__ == '__main__':
    print("=== Disk Kullanım Raporu ===n")
    
    for disk in get_all_partitions():
        print(f"Cihaz  : {disk['device']}")
        print(f"Bağlama: {disk['path']}")
        print(f"Dosya Sistemi: {disk['fstype']}")
        print(f"Toplam : {disk['total_gb']} GB")
        print(f"Kullanılan: {disk['used_gb']} GB ({disk['percent']}%)")
        print(f"Boş   : {disk['free_gb']} GB")
        print("-" * 40)
EOF
python3 basic_disk_check.py

Bu basit script çalıştırıldığında sisteminizdeki tüm bağlı disklerin anlık durumunu gösterir. Ama biz bununla yetinmeyeceğiz.

Eşik Bazlı Uyarı Sistemi

Asıl önemli olan kısma geliyoruz: Belirli bir eşiği aştığında uyarı üretmek. Gerçek dünyada genellikle üç seviye uyarı kullanılır:

  • INFO (Bilgi): %70 ve üzeri, dikkat etmeye başlayın
  • WARNING (Uyarı): %85 ve üzeri, aksiyon planlamaya başlayın
  • CRITICAL (Kritik): %95 ve üzeri, hemen müdahale edin
cat << 'EOF' > disk_alert.py
import psutil
import logging
from datetime import datetime
from enum import Enum

# Logging yapılandırması
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('/var/log/disk_monitor.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

class AlertLevel(Enum):
    OK = 'OK'
    INFO = 'INFO'
    WARNING = 'WARNING'
    CRITICAL = 'CRITICAL'

# Eşik değerleri (yüzde olarak)
THRESHOLDS = {
    AlertLevel.INFO: 70,
    AlertLevel.WARNING: 85,
    AlertLevel.CRITICAL: 95
}

def determine_alert_level(percent):
    """Kullanım yüzdesine göre uyarı seviyesini belirler."""
    if percent >= THRESHOLDS[AlertLevel.CRITICAL]:
        return AlertLevel.CRITICAL
    elif percent >= THRESHOLDS[AlertLevel.WARNING]:
        return AlertLevel.WARNING
    elif percent >= THRESHOLDS[AlertLevel.INFO]:
        return AlertLevel.INFO
    else:
        return AlertLevel.OK

def check_disk_and_alert(path='/'):
    """Diski kontrol eder ve gerekirse uyarı üretir."""
    try:
        usage = psutil.disk_usage(path)
        percent = usage.percent
        free_gb = usage.free / (1024 ** 3)
        
        alert_level = determine_alert_level(percent)
        
        message = (
            f"Disk İzleme | Bölüm: {path} | "
            f"Kullanım: {percent}% | "
            f"Boş: {free_gb:.2f} GB"
        )
        
        if alert_level == AlertLevel.CRITICAL:
            logger.critical(message)
        elif alert_level == AlertLevel.WARNING:
            logger.warning(message)
        elif alert_level == AlertLevel.INFO:
            logger.info(message)
        else:
            logger.debug(message)
        
        return {
            'path': path,
            'percent': percent,
            'free_gb': round(free_gb, 2),
            'alert_level': alert_level,
            'message': message
        }
        
    except Exception as e:
        logger.error(f"Disk kontrol hatası ({path}): {str(e)}")
        return None

if __name__ == '__main__':
    paths_to_monitor = ['/', '/home', '/var', '/tmp']
    
    for path in paths_to_monitor:
        import os
        if os.path.exists(path):
            result = check_disk_and_alert(path)
            if result and result['alert_level'] != AlertLevel.OK:
                print(f"[{result['alert_level'].value}] {result['message']}")
EOF
python3 disk_alert.py

E-posta Bildirimi Entegrasyonu

Uyarıları görmek güzel ama siz bakana kadar log dosyasında beklemesi işe yaramaz. E-posta bildirimi ekleyelim:

cat << 'EOF' > email_notifier.py
import smtplib
import ssl
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime

class EmailNotifier:
    def __init__(self, smtp_server, smtp_port, sender_email, sender_password):
        self.smtp_server = smtp_server
        self.smtp_port = smtp_port
        self.sender_email = sender_email
        self.sender_password = sender_password
    
    def send_alert(self, recipient_emails, disk_info):
        """Disk uyarısı için e-posta gönderir."""
        
        subject = f"[{disk_info['alert_level'].value}] Disk Uyarısı - {disk_info['path']}"
        
        # HTML e-posta içeriği
        html_body = f"""
        <html>
        <body>
            <h2 style="color: {'red' if disk_info['alert_level'].value == 'CRITICAL' else 'orange'};">
                Disk Kullanım Uyarısı
            </h2>
            <table border="1" cellpadding="8">
                <tr><td><b>Sunucu</b></td><td>{__import__('socket').gethostname()}</td></tr>
                <tr><td><b>Disk Bölümü</b></td><td>{disk_info['path']}</td></tr>
                <tr><td><b>Kullanım</b></td><td>{disk_info['percent']}%</td></tr>
                <tr><td><b>Boş Alan</b></td><td>{disk_info['free_gb']} GB</td></tr>
                <tr><td><b>Uyarı Seviyesi</b></td><td>{disk_info['alert_level'].value}</td></tr>
                <tr><td><b>Zaman</b></td><td>{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</td></tr>
            </table>
            <p>Lütfen disk alanını kontrol edin ve gerekli temizlik işlemlerini yapın.</p>
        </body>
        </html>
        """
        
        message = MIMEMultipart("alternative")
        message["Subject"] = subject
        message["From"] = self.sender_email
        message["To"] = ", ".join(recipient_emails)
        
        html_part = MIMEText(html_body, "html")
        message.attach(html_part)
        
        try:
            context = ssl.create_default_context()
            with smtplib.SMTP_SSL(self.smtp_server, self.smtp_port, context=context) as server:
                server.login(self.sender_email, self.sender_password)
                server.sendmail(
                    self.sender_email,
                    recipient_emails,
                    message.as_string()
                )
            print(f"E-posta gönderildi: {', '.join(recipient_emails)}")
            return True
        except Exception as e:
            print(f"E-posta gönderilemedi: {str(e)}")
            return False

# Kullanım örneği:
# notifier = EmailNotifier(
#     smtp_server='smtp.gmail.com',
#     smtp_port=465,
#     sender_email='[email protected]',
#     sender_password='uygulama_sifresi'
# )
EOF

Gmail kullanıyorsanız dikkat: Normal Gmail şifresi değil, “App Password” (Uygulama Şifresi) oluşturmanız gerekiyor. Google hesabınızın güvenlik ayarlarından 2FA aktif edip uygulama şifresi alabilirsiniz.

Slack Webhook Entegrasyonu

Pek çok ekip artık e-posta yerine Slack kullanıyor. Webhook entegrasyonu eklemek çok kolay:

cat << 'EOF' > slack_notifier.py
import requests
import json
from datetime import datetime

class SlackNotifier:
    def __init__(self, webhook_url):
        self.webhook_url = webhook_url
    
    def get_color(self, alert_level):
        """Uyarı seviyesine göre renk döndürür."""
        colors = {
            'OK': '#36a64f',       # Yeşil
            'INFO': '#2196F3',     # Mavi
            'WARNING': '#FF9800',  # Turuncu
            'CRITICAL': '#F44336'  # Kırmızı
        }
        return colors.get(alert_level.value, '#gray')
    
    def send_alert(self, disk_info):
        """Slack'e disk uyarısı gönderir."""
        import socket
        
        hostname = socket.gethostname()
        color = self.get_color(disk_info['alert_level'])
        
        # İlerleme çubuğu simülasyonu
        filled = int(disk_info['percent'] / 10)
        bar = '█' * filled + '░' * (10 - filled)
        
        payload = {
            "attachments": [
                {
                    "color": color,
                    "title": f":hard_drive: Disk Uyarısı - {hostname}",
                    "fields": [
                        {
                            "title": "Bölüm",
                            "value": disk_info['path'],
                            "short": True
                        },
                        {
                            "title": "Kullanım",
                            "value": f"{disk_info['percent']}%",
                            "short": True
                        },
                        {
                            "title": "Boş Alan",
                            "value": f"{disk_info['free_gb']} GB",
                            "short": True
                        },
                        {
                            "title": "Seviye",
                            "value": disk_info['alert_level'].value,
                            "short": True
                        },
                        {
                            "title": "Doluluk",
                            "value": f"`[{bar}] {disk_info['percent']}%`",
                            "short": False
                        }
                    ],
                    "footer": f"Disk Monitor | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
                }
            ]
        }
        
        try:
            response = requests.post(
                self.webhook_url,
                data=json.dumps(payload),
                headers={'Content-Type': 'application/json'},
                timeout=10
            )
            if response.status_code == 200:
                print("Slack bildirimi gönderildi.")
                return True
            else:
                print(f"Slack hatası: {response.status_code} - {response.text}")
                return False
        except requests.exceptions.RequestException as e:
            print(f"Slack bağlantı hatası: {str(e)}")
            return False
EOF

Slack Webhook URL’sini almak için Slack uygulamanızın “Incoming Webhooks” özelliğini aktif etmeniz gerekiyor. Slack API dokümantasyonundan kolayca yapabilirsiniz.

Her Şeyi Bir Araya Getiren Ana Script

Şimdiye kadar yazdığımız tüm parçaları birleştiren, production’da kullanabileceğiniz ana scripti yazalım:

cat << 'EOF' > disk_monitor.py
#!/usr/bin/env python3
"""
Disk İzleme ve Uyarı Sistemi
Kullanım: python3 disk_monitor.py [--config config.json]
"""

import psutil
import logging
import json
import os
import sys
import argparse
from datetime import datetime, timedelta
from pathlib import Path

# Yukarıda yazdığımız modülleri import ediyoruz
# from email_notifier import EmailNotifier
# from slack_notifier import SlackNotifier

# Logging yapılandırması
LOG_DIR = Path('/var/log/disk_monitor')
LOG_DIR.mkdir(parents=True, exist_ok=True)

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(LOG_DIR / 'disk_monitor.log'),
        logging.StreamHandler(sys.stdout)
    ]
)
logger = logging.getLogger(__name__)

# Varsayılan yapılandırma
DEFAULT_CONFIG = {
    "paths_to_monitor": ["/", "/home", "/var", "/tmp"],
    "thresholds": {
        "info": 70,
        "warning": 85,
        "critical": 95
    },
    "check_interval_seconds": 300,
    "alert_cooldown_minutes": 30,
    "notifications": {
        "email": {
            "enabled": False,
            "smtp_server": "smtp.gmail.com",
            "smtp_port": 465,
            "sender": "",
            "password": "",
            "recipients": []
        },
        "slack": {
            "enabled": False,
            "webhook_url": ""
        }
    },
    "ignore_filesystems": ["tmpfs", "devtmpfs", "squashfs", "overlay"]
}

class DiskMonitor:
    def __init__(self, config):
        self.config = config
        self.last_alert_times = {}  # Uyarı tekrarını önlemek için
        
    def should_send_alert(self, path, alert_level):
        """Cooldown süresine göre uyarı gönderilip gönderilmeyeceğini belirler."""
        key = f"{path}_{alert_level}"
        now = datetime.now()
        cooldown = timedelta(minutes=self.config['alert_cooldown_minutes'])
        
        if key in self.last_alert_times:
            time_since_last = now - self.last_alert_times[key]
            if time_since_last < cooldown:
                return False
        
        self.last_alert_times[key] = now
        return True
    
    def get_disk_stats(self):
        """Tüm izlenen diskler için istatistikleri toplar."""
        stats = []
        ignore_fs = self.config.get('ignore_filesystems', [])
        
        for partition in psutil.disk_partitions():
            # Yoksayılacak dosya sistemlerini atla
            if partition.fstype in ignore_fs:
                continue
            
            # Sadece belirtilen path'leri izle
            if self.config['paths_to_monitor'] and 
               partition.mountpoint not in self.config['paths_to_monitor']:
                continue
            
            try:
                usage = psutil.disk_usage(partition.mountpoint)
                
                stats.append({
                    'device': partition.device,
                    'mountpoint': partition.mountpoint,
                    'fstype': partition.fstype,
                    'total_gb': round(usage.total / (1024**3), 2),
                    'used_gb': round(usage.used / (1024**3), 2),
                    'free_gb': round(usage.free / (1024**3), 2),
                    'percent': usage.percent,
                    'timestamp': datetime.now().isoformat()
                })
            except (PermissionError, OSError) as e:
                logger.warning(f"Disk okunamadı ({partition.mountpoint}): {e}")
        
        return stats
    
    def evaluate_stats(self, stats):
        """İstatistikleri değerlendirir ve uyarı gereken durumları döndürür."""
        alerts = []
        thresholds = self.config['thresholds']
        
        for stat in stats:
            percent = stat['percent']
            
            if percent >= thresholds['critical']:
                level = 'CRITICAL'
            elif percent >= thresholds['warning']:
                level = 'WARNING'
            elif percent >= thresholds['info']:
                level = 'INFO'
            else:
                continue  # Normal seviye, uyarı gerekmez
            
            if self.should_send_alert(stat['mountpoint'], level):
                alerts.append({
                    **stat,
                    'alert_level': level
                })
                logger.log(
                    logging.CRITICAL if level == 'CRITICAL' else
                    logging.WARNING if level == 'WARNING' else
                    logging.INFO,
                    f"Disk uyarısı: {stat['mountpoint']} - "
                    f"{percent}% kullanım ({level})"
                )
        
        return alerts
    
    def generate_report(self, stats):
        """JSON formatında rapor oluşturur."""
        report = {
            'hostname': __import__('socket').gethostname(),
            'report_time': datetime.now().isoformat(),
            'disk_stats': stats,
            'summary': {
                'total_disks': len(stats),
                'critical_count': sum(1 for s in stats if s['percent'] >= self.config['thresholds']['critical']),
                'warning_count': sum(1 for s in stats if self.config['thresholds']['warning'] <= s['percent'] < self.config['thresholds']['critical']),
                'ok_count': sum(1 for s in stats if s['percent'] < self.config['thresholds']['warning'])
            }
        }
        
        report_path = LOG_DIR / f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        with open(report_path, 'w') as f:
            json.dump(report, f, indent=2, ensure_ascii=False)
        
        logger.info(f"Rapor oluşturuldu: {report_path}")
        return report
    
    def run_once(self):
        """Tek seferlik kontrol çalıştırır."""
        logger.info("Disk kontrolü başlatılıyor...")
        
        stats = self.get_disk_stats()
        alerts = self.evaluate_stats(stats)
        report = self.generate_report(stats)
        
        if alerts:
            logger.warning(f"{len(alerts)} adet uyarı tespit edildi.")
            # Burada email/slack bildirimleri gönderilir
        else:
            logger.info("Tüm diskler normal seviyede.")
        
        return report, alerts

def load_config(config_path=None):
    """Yapılandırma dosyasını yükler."""
    config = DEFAULT_CONFIG.copy()
    
    if config_path and os.path.exists(config_path):
        with open(config_path) as f:
            user_config = json.load(f)
            config.update(user_config)
        logger.info(f"Yapılandırma yüklendi: {config_path}")
    
    return config

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Disk İzleme Sistemi')
    parser.add_argument('--config', help='Yapılandırma dosyası yolu')
    parser.add_argument('--once', action='store_true', help='Tek seferlik kontrol')
    parser.add_argument('--report', action='store_true', help='Rapor oluştur')
    args = parser.parse_args()
    
    config = load_config(args.config)
    monitor = DiskMonitor(config)
    
    report, alerts = monitor.run_once()
    
    print(f"nÖzet: {report['summary']}")
    if alerts:
        print(f"nUyarılar:")
        for alert in alerts:
            print(f"  [{alert['alert_level']}] {alert['mountpoint']}: {alert['percent']}%")
EOF
python3 disk_monitor.py --once

Cron ile Otomatik Çalıştırma

Script hazır, ama elle çalıştırmak işe yaramaz. Cron ile düzenli aralıklarla çalışmasını sağlayalım:

# Crontab düzenleme
crontab -e

# Her 15 dakikada bir çalıştır ve çıktıyı logla
*/15 * * * * /usr/bin/python3 /opt/disk_monitor/disk_monitor.py --once >> /var/log/disk_monitor/cron.log 2>&1

# Her sabah 08:00'de günlük rapor oluştur
0 8 * * * /usr/bin/python3 /opt/disk_monitor/disk_monitor.py --once --report >> /var/log/disk_monitor/daily_report.log 2>&1

# Script'i doğru path ile test et
/usr/bin/python3 /opt/disk_monitor/disk_monitor.py --once

# Cron loglarını kontrol et
tail -f /var/log/disk_monitor/cron.log

Crontab yazarken Python’un tam path’ini kullanmak önemli. which python3 komutuyla doğru path’i öğrenebilirsiniz.

Trend Analizi ve Doluluk Tahmini

Sadece anlık durumu değil, diskin ne zaman dolacağını tahmin etmek de çok değerli. Basit bir lineer regresyon ile bunu yapabiliriz:

cat << 'EOF' > trend_analyzer.py
import json
import os
from datetime import datetime, timedelta
from pathlib import Path

def load_historical_data(log_dir='/var/log/disk_monitor', mountpoint='/', days=7):
    """Son N günün verilerini yükler."""
    data_points = []
    log_path = Path(log_dir)
    
    cutoff = datetime.now() - timedelta(days=days)
    
    for report_file in sorted(log_path.glob('report_*.json')):
        try:
            with open(report_file) as f:
                report = json.load(f)
            
            report_time = datetime.fromisoformat(report['report_time'])
            if report_time < cutoff:
                continue
            
            for disk in report['disk_stats']:
                if disk['mountpoint'] == mountpoint:
                    data_points.append({
                        'timestamp': report_time,
                        'percent': disk['percent'],
                        'free_gb': disk['free_gb']
                    })
        except (json.JSONDecodeError, KeyError):
            continue
    
    return data_points

def predict_full_date(data_points):
    """
    Basit lineer regresyon ile diskin ne zaman dolacağını tahmin eder.
    Returns: Tahmini dolma tarihi veya None
    """
    if len(data_points) < 2:
        return None, None
    
    # Zaman damgalarını sayısal değere çevir (saat cinsinden)
    base_time = data_points[0]['timestamp']
    x_values = [(dp['timestamp'] - base_time).total_seconds() / 3600 for dp in data_points]
    y_values = [dp['percent'] for dp in data_points]
    
    # Lineer regresyon (basit slope hesabı)
    n = len(x_values)
    sum_x = sum(x_values)
    sum_y = sum(y_values)
    sum_xy = sum(x * y for x, y in zip(x_values, y_values))
    sum_x2 = sum(x ** 2 for x in x_values)
    
    denominator = n * sum_x2 - sum_x ** 2
    if denominator == 0:
        return None, None
    
    slope = (n * sum_xy - sum_x * sum_y) / denominator
    intercept = (sum_y - slope * sum_x) / n
    
    # Slope: saatte yüzde artış miktarı
    if slope <= 0:
        return None, slope  # Disk dolmuyor veya azalıyor
    
    # %95'e ne zaman ulaşır?
    current_percent = y_values[-1]
    hours_to_95 = (95 - current_percent) / slope
    
    if hours_to_95 < 0:
        predicted_date = datetime.now()  # Zaten dolmuş
    else:
        predicted_date = datetime.now() + timedelta(hours=hours_to_95)
    
    return predicted_date, slope

def print_trend_report(mountpoint='/', days=7):
    """Trend raporunu ekrana yazdırır."""
    data = load_historical_data(mountpoint=mountpoint, days=days)
    
    if not data:
        print(f"'{mountpoint}' için yeterli veri bulunamadı.")
        return
    
    print(f"n=== {mountpoint} Trend Analizi (Son {days} Gün) ===")
    print(f"Veri noktası sayısı: {len(data)}")
    print(f"Başlangıç kullanımı: {data[0]['percent']}%")
    print(f"Güncel kullanım: {data[-1]['percent']}%")
    print(f"Değişim: {data[-1]['percent'] - data[0]['percent']:.1f}%")
    
    predicted_date, slope = predict_full_date(data)
    
    if slope is not None and slope > 0:
        print(f"Büyüme hızı: Günde ~{slope * 24:.2f}%")
        if predicted_date:
            print(f"Tahmini %95 dolma tarihi: {predicted_date.strftime('%Y-%m-%d %H:%M')}")
            days_remaining = (predicted_date - datetime.now()).days
            if days_remaining < 30:
                print(f"UYARI: {days_remaining} gün kaldı!")
    elif slope is not None and slope <= 0:
        print("Disk kullanımı azalıyor veya sabit. Risk yok.")
    else:
        print("Trend hesaplanamadı.")

if __name__ == '__main__':
    print_trend_report('/', days=7)
EOF
python3 trend_analyzer.py

Systemd Service Olarak Kurulum

Scripti sürekli çalışan bir servis olarak ayarlamak istiyorsanız systemd kullanabilirsiniz:

# Servis dosyası oluştur
cat << 'EOF' > /etc/systemd/system/disk-monitor.service
[Unit]
Description=Disk İzleme ve Uyarı Servisi
After=network.target

[Service]
Type=simple
User=nobody
Group=nogroup
WorkingDirectory=/opt/disk_monitor
ExecStart=/usr/bin/python3 /opt/disk_monitor/disk_monitor.py
Restart=always
RestartSec=300
StandardOutput=journal
StandardError=journal

# Güvenlik kısıtlamaları
NoNewPrivileges=true
ReadOnlyPaths=/
ReadWritePaths=/var/log/disk_monitor

[Install]
WantedBy=multi-user.target
EOF

# Script dosyalarını yerleştir
mkdir -p /opt/disk_monitor
cp disk_monitor.py /opt/disk_monitor/

# Servis izinleri
chmod 644 /etc/systemd/system/disk-monitor.service

# Systemd'yi güncelle ve servisi başlat
systemctl daemon-reload
systemctl enable disk-monitor
systemctl start disk-monitor
systemctl status disk-monitor

# Logları takip et
journalctl -u disk-monitor -f

Gerçek Dünya Senaryosu: Log Dosyası Şişmesi

Pek çok sunucuda /var bölümü log dosyaları yüzünden şişer. Bu durumu hem tespit edip hem de neden dolduğunu anlatan bir ek modül yazalım:

cat << 'EOF' > disk_investigator.py
import os
import psutil
from pathlib import Path

def find_largest_files(directory, top_n=10, min_size_mb=10):
    """Belirtilen dizindeki en büyük dosyaları bulur."""
    large_files = []
    
    try:
        for root, dirs, files in os.walk(directory):
            # /proc ve /sys gibi özel dizinleri atla
            dirs[:] = [d for d in dirs if not d.startswith('.') 
                      and os.path.join(root, d) not in ['/proc', '/sys', '/dev']]
            
            for filename in files:
                filepath = os.path.join(root, filename)
                try:
                    size = os.path.getsize(filepath)
                    size_mb = size / (1024 * 1024)
                    
                    if size_mb >= min_size_mb:
                        large_files.append({
                            'path': filepath,
                            'size_mb': round(size_mb, 2),
                            'size_gb': round(size_mb / 1024, 3)
                        })
                except (OSError, PermissionError):
                    continue
    except PermissionError:
        pass
    
    # Büyükten küçüğe sırala
    large_files.sort(key=lambda x: x['size_mb'], reverse=True)
    return large_files[:top_n]

def investigate_disk_usage(mountpoint='/var'):
    """Disk doluluk sebebini araştırır."""
    print(f"n=== {mountpoint} Disk Kullanım Analizi ===n")
    
    usage = psutil.disk_usage(mountpoint)
    print(f"Toplam: {usage.total / (1024**3):.2f} GB")
    print(f"Kullanılan: {usage.used / (1024**3):.2f} GB ({usage.percent}%)")
    print(f"Boş: {usage.free / (1024**3):.2f} GB")
    
    print(f"nEn büyük dosyalar (10 MB üzeri):")
    print("-" * 60)
    
    large_files = find_largest_files(mountpoint, top_n=10, min_size_mb=10)
    
    for i, file_info in enumerate(large_files, 1):
        print(f"{i:2}. {file_info['size_mb']:8.1f} MB  {file_info['path']}")
    
    if not large_files:
        print("10 MB üzerinde dosya bulunamadı.")
    
    # Dizin bazlı özet
    print(f"nAlt dizin kullanımları:")
    print("-" * 60)
    
    try:
        subdirs = [(d.path, d.name) for d in os.scandir(mountpoint) 
                   if d.is_dir() and not d.is_symlink()]
        
        dir_sizes = []
        for dirpath, dirname in subdirs:
            try:
                total = sum(
                    os.path.getsize(os.path.join(root, f))
                    for root, dirs, files in os.walk(dirpath)
                    for f in files
                    if not os.path.islink(os.path.join(root, f))
                )
                dir_sizes.append((dirname, total / (1024**2)))
            except (PermissionError, OSError):
                continue
        
        dir_sizes.sort(key=lambda x: x[1], reverse=True)
        for dirname, size_mb in dir_sizes[:10]:
            print(f"  {size_mb:8.1f} MB  {mountpoint}/{dirname}")
    
    except PermissionError:
        print("Dizin erişim izni yok.")

if __name__ == '__main__':
    investigate_disk_usage('/var')
EOF
python3 disk_investigator.py

Sonuç

Bu yazıda basit bir disk sorgusu scriptinden başlayıp, eşik bazlı uyarı sistemi, e-posta ve Slack bildirimleri, trend analizi ve systemd entegrasyonu olan kapsamlı bir izleme sistemine ulaştık. Tüm bu scriptler birbiriyle entegre çalışacak şekilde tasarlandı.

Birkaç önemli noktayı hatırlatmak isterim:

  • Uyarı yorgunluğunu önlemek için cooldown mekanizmasını mutlaka kullanın. Her 5 dakikada bir e-posta almak uyarıları görmezden gelmenize yol açar.
  • Log rotasyonu ayarlamayı unutmayın. /etc/logrotate.d/disk_monitor dosyası oluşturarak log dosyalarının şişmesini önleyin. Yoksa izleme scriptiniz kendi kendini baltalar.
  • Test edin. Scripti production’a almadan önce eşik değerlerini geçici olarak düşürüp uyarı mekanizmalarının gerçekten çalıştığını doğrulayın.
  • Güvenlik: SMTP şifresi ve webhook URL’sini kesinlikle script içine yazmayın. Environment variable veya ayrı bir konfigürasyon dosyası kullanın, bu dosyayı da 600 izniyle koruyun.

Sistemi kendi ihtiyaçlarınıza göre genişletebilirsiniz: Telegram bildirimi, özel dosya sistemi filtreleri veya daha gelişmiş tahmin algoritmaları eklemek bu temel üzerine inşa etmek oldukça kolay. Önemli olan sizi gecenin üçünde uyandıracak o “disk doldu” çağrısını almadan önce harekete geçmek.

Yorum yapın