Disk doldu uyarısını gecenin üçünde bir kullanıcıdan öğrenmek, her sistem yöneticisinin kabusudur. Oysa bu problemi önceden tespit etmek ve otomatik olarak haberdar olmak son derece mümkün. Python’un psutil kütüphanesi ve birkaç satır kod ile hem disk kullanımını izleyen hem de kritik eşiklerde sizi uyaran sağlam bir sistem kurabilirsiniz. Bu yazıda sıfırdan başlayarak production ortamında kullanabileceğiniz bir disk izleme ve uyarı sistemi geliştireceğiz.
Neden Python ile Disk İzleme?
Evet, Nagios, Zabbix veya Prometheus gibi araçlar zaten disk izleme yapıyor. Ama bazen şöyle durumlar olur: Küçük bir VPS’iniz var, enterprise bir monitoring aracı kurmak için ne bütçeniz ne de zamanınız var. Ya da mevcut monitoring sisteminizin dışında kalan özel bir klasörü izlemek istiyorsunuz. İşte bu gibi durumlarda Python ile kendi aracınızı yazmak hem hızlı hem de esnek bir çözüm sunuyor.
Python’un avantajları bu iş için gerçekten belirgin:
- Çapraz platform desteği: Aynı script Linux, Windows ve macOS’ta çalışır
- Zengin kütüphane ekosistemi:
psutil,smtplib,logginggibi hazır araçlar - Kolay entegrasyon: Slack, PagerDuty, Telegram gibi servislere bağlanmak çok basit
- Düşük kaynak tüketimi: Hafif bir script olarak arka planda rahatça çalışır
Kurulum ve Gereksinimler
Önce gerekli kütüphaneyi kuralım. Python standard library’nin dışında sadece psutil‘e ihtiyacımız var:
pip install psutil
# Sanal ortam kullanıyorsanız (önerilen):
python3 -m venv disk_monitor_env
source disk_monitor_env/bin/activate
pip install psutil requests # requests Slack/webhook için
Python 3.6 ve üzeri bir sürüm kullandığınızdan emin olun. Çoğu modern Linux dağıtımında Python 3 zaten kurulu geliyor ama python3 --version ile kontrol etmek her zaman iyi bir alışkanlık.
Temel Disk Kullanımı Sorgusu
Her şeyin temeli olan basit bir disk kullanım sorgusundan başlayalım:
cat << 'EOF' > basic_disk_check.py
import psutil
import os
def get_disk_usage(path='/'):
"""Belirtilen dizin için disk kullanım bilgilerini döndürür."""
usage = psutil.disk_usage(path)
total_gb = usage.total / (1024 ** 3)
used_gb = usage.used / (1024 ** 3)
free_gb = usage.free / (1024 ** 3)
percent = usage.percent
return {
'path': path,
'total_gb': round(total_gb, 2),
'used_gb': round(used_gb, 2),
'free_gb': round(free_gb, 2),
'percent': percent
}
def get_all_partitions():
"""Sistemdeki tüm disk bölümlerini listeler."""
partitions = psutil.disk_partitions()
results = []
for partition in partitions:
try:
usage = get_disk_usage(partition.mountpoint)
usage['device'] = partition.device
usage['fstype'] = partition.fstype
results.append(usage)
except PermissionError:
# Bazı özel dosya sistemlerine erişim reddedilebilir
continue
return results
if __name__ == '__main__':
print("=== Disk Kullanım Raporu ===n")
for disk in get_all_partitions():
print(f"Cihaz : {disk['device']}")
print(f"Bağlama: {disk['path']}")
print(f"Dosya Sistemi: {disk['fstype']}")
print(f"Toplam : {disk['total_gb']} GB")
print(f"Kullanılan: {disk['used_gb']} GB ({disk['percent']}%)")
print(f"Boş : {disk['free_gb']} GB")
print("-" * 40)
EOF
python3 basic_disk_check.py
Bu basit script çalıştırıldığında sisteminizdeki tüm bağlı disklerin anlık durumunu gösterir. Ama biz bununla yetinmeyeceğiz.
Eşik Bazlı Uyarı Sistemi
Asıl önemli olan kısma geliyoruz: Belirli bir eşiği aştığında uyarı üretmek. Gerçek dünyada genellikle üç seviye uyarı kullanılır:
- INFO (Bilgi): %70 ve üzeri, dikkat etmeye başlayın
- WARNING (Uyarı): %85 ve üzeri, aksiyon planlamaya başlayın
- CRITICAL (Kritik): %95 ve üzeri, hemen müdahale edin
cat << 'EOF' > disk_alert.py
import psutil
import logging
from datetime import datetime
from enum import Enum
# Logging yapılandırması
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/disk_monitor.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class AlertLevel(Enum):
OK = 'OK'
INFO = 'INFO'
WARNING = 'WARNING'
CRITICAL = 'CRITICAL'
# Eşik değerleri (yüzde olarak)
THRESHOLDS = {
AlertLevel.INFO: 70,
AlertLevel.WARNING: 85,
AlertLevel.CRITICAL: 95
}
def determine_alert_level(percent):
"""Kullanım yüzdesine göre uyarı seviyesini belirler."""
if percent >= THRESHOLDS[AlertLevel.CRITICAL]:
return AlertLevel.CRITICAL
elif percent >= THRESHOLDS[AlertLevel.WARNING]:
return AlertLevel.WARNING
elif percent >= THRESHOLDS[AlertLevel.INFO]:
return AlertLevel.INFO
else:
return AlertLevel.OK
def check_disk_and_alert(path='/'):
"""Diski kontrol eder ve gerekirse uyarı üretir."""
try:
usage = psutil.disk_usage(path)
percent = usage.percent
free_gb = usage.free / (1024 ** 3)
alert_level = determine_alert_level(percent)
message = (
f"Disk İzleme | Bölüm: {path} | "
f"Kullanım: {percent}% | "
f"Boş: {free_gb:.2f} GB"
)
if alert_level == AlertLevel.CRITICAL:
logger.critical(message)
elif alert_level == AlertLevel.WARNING:
logger.warning(message)
elif alert_level == AlertLevel.INFO:
logger.info(message)
else:
logger.debug(message)
return {
'path': path,
'percent': percent,
'free_gb': round(free_gb, 2),
'alert_level': alert_level,
'message': message
}
except Exception as e:
logger.error(f"Disk kontrol hatası ({path}): {str(e)}")
return None
if __name__ == '__main__':
paths_to_monitor = ['/', '/home', '/var', '/tmp']
for path in paths_to_monitor:
import os
if os.path.exists(path):
result = check_disk_and_alert(path)
if result and result['alert_level'] != AlertLevel.OK:
print(f"[{result['alert_level'].value}] {result['message']}")
EOF
python3 disk_alert.py
E-posta Bildirimi Entegrasyonu
Uyarıları görmek güzel ama siz bakana kadar log dosyasında beklemesi işe yaramaz. E-posta bildirimi ekleyelim:
cat << 'EOF' > email_notifier.py
import smtplib
import ssl
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
class EmailNotifier:
def __init__(self, smtp_server, smtp_port, sender_email, sender_password):
self.smtp_server = smtp_server
self.smtp_port = smtp_port
self.sender_email = sender_email
self.sender_password = sender_password
def send_alert(self, recipient_emails, disk_info):
"""Disk uyarısı için e-posta gönderir."""
subject = f"[{disk_info['alert_level'].value}] Disk Uyarısı - {disk_info['path']}"
# HTML e-posta içeriği
html_body = f"""
<html>
<body>
<h2 style="color: {'red' if disk_info['alert_level'].value == 'CRITICAL' else 'orange'};">
Disk Kullanım Uyarısı
</h2>
<table border="1" cellpadding="8">
<tr><td><b>Sunucu</b></td><td>{__import__('socket').gethostname()}</td></tr>
<tr><td><b>Disk Bölümü</b></td><td>{disk_info['path']}</td></tr>
<tr><td><b>Kullanım</b></td><td>{disk_info['percent']}%</td></tr>
<tr><td><b>Boş Alan</b></td><td>{disk_info['free_gb']} GB</td></tr>
<tr><td><b>Uyarı Seviyesi</b></td><td>{disk_info['alert_level'].value}</td></tr>
<tr><td><b>Zaman</b></td><td>{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</td></tr>
</table>
<p>Lütfen disk alanını kontrol edin ve gerekli temizlik işlemlerini yapın.</p>
</body>
</html>
"""
message = MIMEMultipart("alternative")
message["Subject"] = subject
message["From"] = self.sender_email
message["To"] = ", ".join(recipient_emails)
html_part = MIMEText(html_body, "html")
message.attach(html_part)
try:
context = ssl.create_default_context()
with smtplib.SMTP_SSL(self.smtp_server, self.smtp_port, context=context) as server:
server.login(self.sender_email, self.sender_password)
server.sendmail(
self.sender_email,
recipient_emails,
message.as_string()
)
print(f"E-posta gönderildi: {', '.join(recipient_emails)}")
return True
except Exception as e:
print(f"E-posta gönderilemedi: {str(e)}")
return False
# Kullanım örneği:
# notifier = EmailNotifier(
# smtp_server='smtp.gmail.com',
# smtp_port=465,
# sender_email='[email protected]',
# sender_password='uygulama_sifresi'
# )
EOF
Gmail kullanıyorsanız dikkat: Normal Gmail şifresi değil, “App Password” (Uygulama Şifresi) oluşturmanız gerekiyor. Google hesabınızın güvenlik ayarlarından 2FA aktif edip uygulama şifresi alabilirsiniz.
Slack Webhook Entegrasyonu
Pek çok ekip artık e-posta yerine Slack kullanıyor. Webhook entegrasyonu eklemek çok kolay:
cat << 'EOF' > slack_notifier.py
import requests
import json
from datetime import datetime
class SlackNotifier:
def __init__(self, webhook_url):
self.webhook_url = webhook_url
def get_color(self, alert_level):
"""Uyarı seviyesine göre renk döndürür."""
colors = {
'OK': '#36a64f', # Yeşil
'INFO': '#2196F3', # Mavi
'WARNING': '#FF9800', # Turuncu
'CRITICAL': '#F44336' # Kırmızı
}
return colors.get(alert_level.value, '#gray')
def send_alert(self, disk_info):
"""Slack'e disk uyarısı gönderir."""
import socket
hostname = socket.gethostname()
color = self.get_color(disk_info['alert_level'])
# İlerleme çubuğu simülasyonu
filled = int(disk_info['percent'] / 10)
bar = '█' * filled + '░' * (10 - filled)
payload = {
"attachments": [
{
"color": color,
"title": f":hard_drive: Disk Uyarısı - {hostname}",
"fields": [
{
"title": "Bölüm",
"value": disk_info['path'],
"short": True
},
{
"title": "Kullanım",
"value": f"{disk_info['percent']}%",
"short": True
},
{
"title": "Boş Alan",
"value": f"{disk_info['free_gb']} GB",
"short": True
},
{
"title": "Seviye",
"value": disk_info['alert_level'].value,
"short": True
},
{
"title": "Doluluk",
"value": f"`[{bar}] {disk_info['percent']}%`",
"short": False
}
],
"footer": f"Disk Monitor | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
}
]
}
try:
response = requests.post(
self.webhook_url,
data=json.dumps(payload),
headers={'Content-Type': 'application/json'},
timeout=10
)
if response.status_code == 200:
print("Slack bildirimi gönderildi.")
return True
else:
print(f"Slack hatası: {response.status_code} - {response.text}")
return False
except requests.exceptions.RequestException as e:
print(f"Slack bağlantı hatası: {str(e)}")
return False
EOF
Slack Webhook URL’sini almak için Slack uygulamanızın “Incoming Webhooks” özelliğini aktif etmeniz gerekiyor. Slack API dokümantasyonundan kolayca yapabilirsiniz.
Her Şeyi Bir Araya Getiren Ana Script
Şimdiye kadar yazdığımız tüm parçaları birleştiren, production’da kullanabileceğiniz ana scripti yazalım:
cat << 'EOF' > disk_monitor.py
#!/usr/bin/env python3
"""
Disk İzleme ve Uyarı Sistemi
Kullanım: python3 disk_monitor.py [--config config.json]
"""
import psutil
import logging
import json
import os
import sys
import argparse
from datetime import datetime, timedelta
from pathlib import Path
# Yukarıda yazdığımız modülleri import ediyoruz
# from email_notifier import EmailNotifier
# from slack_notifier import SlackNotifier
# Logging yapılandırması
LOG_DIR = Path('/var/log/disk_monitor')
LOG_DIR.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOG_DIR / 'disk_monitor.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
# Varsayılan yapılandırma
DEFAULT_CONFIG = {
"paths_to_monitor": ["/", "/home", "/var", "/tmp"],
"thresholds": {
"info": 70,
"warning": 85,
"critical": 95
},
"check_interval_seconds": 300,
"alert_cooldown_minutes": 30,
"notifications": {
"email": {
"enabled": False,
"smtp_server": "smtp.gmail.com",
"smtp_port": 465,
"sender": "",
"password": "",
"recipients": []
},
"slack": {
"enabled": False,
"webhook_url": ""
}
},
"ignore_filesystems": ["tmpfs", "devtmpfs", "squashfs", "overlay"]
}
class DiskMonitor:
def __init__(self, config):
self.config = config
self.last_alert_times = {} # Uyarı tekrarını önlemek için
def should_send_alert(self, path, alert_level):
"""Cooldown süresine göre uyarı gönderilip gönderilmeyeceğini belirler."""
key = f"{path}_{alert_level}"
now = datetime.now()
cooldown = timedelta(minutes=self.config['alert_cooldown_minutes'])
if key in self.last_alert_times:
time_since_last = now - self.last_alert_times[key]
if time_since_last < cooldown:
return False
self.last_alert_times[key] = now
return True
def get_disk_stats(self):
"""Tüm izlenen diskler için istatistikleri toplar."""
stats = []
ignore_fs = self.config.get('ignore_filesystems', [])
for partition in psutil.disk_partitions():
# Yoksayılacak dosya sistemlerini atla
if partition.fstype in ignore_fs:
continue
# Sadece belirtilen path'leri izle
if self.config['paths_to_monitor'] and
partition.mountpoint not in self.config['paths_to_monitor']:
continue
try:
usage = psutil.disk_usage(partition.mountpoint)
stats.append({
'device': partition.device,
'mountpoint': partition.mountpoint,
'fstype': partition.fstype,
'total_gb': round(usage.total / (1024**3), 2),
'used_gb': round(usage.used / (1024**3), 2),
'free_gb': round(usage.free / (1024**3), 2),
'percent': usage.percent,
'timestamp': datetime.now().isoformat()
})
except (PermissionError, OSError) as e:
logger.warning(f"Disk okunamadı ({partition.mountpoint}): {e}")
return stats
def evaluate_stats(self, stats):
"""İstatistikleri değerlendirir ve uyarı gereken durumları döndürür."""
alerts = []
thresholds = self.config['thresholds']
for stat in stats:
percent = stat['percent']
if percent >= thresholds['critical']:
level = 'CRITICAL'
elif percent >= thresholds['warning']:
level = 'WARNING'
elif percent >= thresholds['info']:
level = 'INFO'
else:
continue # Normal seviye, uyarı gerekmez
if self.should_send_alert(stat['mountpoint'], level):
alerts.append({
**stat,
'alert_level': level
})
logger.log(
logging.CRITICAL if level == 'CRITICAL' else
logging.WARNING if level == 'WARNING' else
logging.INFO,
f"Disk uyarısı: {stat['mountpoint']} - "
f"{percent}% kullanım ({level})"
)
return alerts
def generate_report(self, stats):
"""JSON formatında rapor oluşturur."""
report = {
'hostname': __import__('socket').gethostname(),
'report_time': datetime.now().isoformat(),
'disk_stats': stats,
'summary': {
'total_disks': len(stats),
'critical_count': sum(1 for s in stats if s['percent'] >= self.config['thresholds']['critical']),
'warning_count': sum(1 for s in stats if self.config['thresholds']['warning'] <= s['percent'] < self.config['thresholds']['critical']),
'ok_count': sum(1 for s in stats if s['percent'] < self.config['thresholds']['warning'])
}
}
report_path = LOG_DIR / f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(report_path, 'w') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
logger.info(f"Rapor oluşturuldu: {report_path}")
return report
def run_once(self):
"""Tek seferlik kontrol çalıştırır."""
logger.info("Disk kontrolü başlatılıyor...")
stats = self.get_disk_stats()
alerts = self.evaluate_stats(stats)
report = self.generate_report(stats)
if alerts:
logger.warning(f"{len(alerts)} adet uyarı tespit edildi.")
# Burada email/slack bildirimleri gönderilir
else:
logger.info("Tüm diskler normal seviyede.")
return report, alerts
def load_config(config_path=None):
"""Yapılandırma dosyasını yükler."""
config = DEFAULT_CONFIG.copy()
if config_path and os.path.exists(config_path):
with open(config_path) as f:
user_config = json.load(f)
config.update(user_config)
logger.info(f"Yapılandırma yüklendi: {config_path}")
return config
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Disk İzleme Sistemi')
parser.add_argument('--config', help='Yapılandırma dosyası yolu')
parser.add_argument('--once', action='store_true', help='Tek seferlik kontrol')
parser.add_argument('--report', action='store_true', help='Rapor oluştur')
args = parser.parse_args()
config = load_config(args.config)
monitor = DiskMonitor(config)
report, alerts = monitor.run_once()
print(f"nÖzet: {report['summary']}")
if alerts:
print(f"nUyarılar:")
for alert in alerts:
print(f" [{alert['alert_level']}] {alert['mountpoint']}: {alert['percent']}%")
EOF
python3 disk_monitor.py --once
Cron ile Otomatik Çalıştırma
Script hazır, ama elle çalıştırmak işe yaramaz. Cron ile düzenli aralıklarla çalışmasını sağlayalım:
# Crontab düzenleme
crontab -e
# Her 15 dakikada bir çalıştır ve çıktıyı logla
*/15 * * * * /usr/bin/python3 /opt/disk_monitor/disk_monitor.py --once >> /var/log/disk_monitor/cron.log 2>&1
# Her sabah 08:00'de günlük rapor oluştur
0 8 * * * /usr/bin/python3 /opt/disk_monitor/disk_monitor.py --once --report >> /var/log/disk_monitor/daily_report.log 2>&1
# Script'i doğru path ile test et
/usr/bin/python3 /opt/disk_monitor/disk_monitor.py --once
# Cron loglarını kontrol et
tail -f /var/log/disk_monitor/cron.log
Crontab yazarken Python’un tam path’ini kullanmak önemli. which python3 komutuyla doğru path’i öğrenebilirsiniz.
Trend Analizi ve Doluluk Tahmini
Sadece anlık durumu değil, diskin ne zaman dolacağını tahmin etmek de çok değerli. Basit bir lineer regresyon ile bunu yapabiliriz:
cat << 'EOF' > trend_analyzer.py
import json
import os
from datetime import datetime, timedelta
from pathlib import Path
def load_historical_data(log_dir='/var/log/disk_monitor', mountpoint='/', days=7):
"""Son N günün verilerini yükler."""
data_points = []
log_path = Path(log_dir)
cutoff = datetime.now() - timedelta(days=days)
for report_file in sorted(log_path.glob('report_*.json')):
try:
with open(report_file) as f:
report = json.load(f)
report_time = datetime.fromisoformat(report['report_time'])
if report_time < cutoff:
continue
for disk in report['disk_stats']:
if disk['mountpoint'] == mountpoint:
data_points.append({
'timestamp': report_time,
'percent': disk['percent'],
'free_gb': disk['free_gb']
})
except (json.JSONDecodeError, KeyError):
continue
return data_points
def predict_full_date(data_points):
"""
Basit lineer regresyon ile diskin ne zaman dolacağını tahmin eder.
Returns: Tahmini dolma tarihi veya None
"""
if len(data_points) < 2:
return None, None
# Zaman damgalarını sayısal değere çevir (saat cinsinden)
base_time = data_points[0]['timestamp']
x_values = [(dp['timestamp'] - base_time).total_seconds() / 3600 for dp in data_points]
y_values = [dp['percent'] for dp in data_points]
# Lineer regresyon (basit slope hesabı)
n = len(x_values)
sum_x = sum(x_values)
sum_y = sum(y_values)
sum_xy = sum(x * y for x, y in zip(x_values, y_values))
sum_x2 = sum(x ** 2 for x in x_values)
denominator = n * sum_x2 - sum_x ** 2
if denominator == 0:
return None, None
slope = (n * sum_xy - sum_x * sum_y) / denominator
intercept = (sum_y - slope * sum_x) / n
# Slope: saatte yüzde artış miktarı
if slope <= 0:
return None, slope # Disk dolmuyor veya azalıyor
# %95'e ne zaman ulaşır?
current_percent = y_values[-1]
hours_to_95 = (95 - current_percent) / slope
if hours_to_95 < 0:
predicted_date = datetime.now() # Zaten dolmuş
else:
predicted_date = datetime.now() + timedelta(hours=hours_to_95)
return predicted_date, slope
def print_trend_report(mountpoint='/', days=7):
"""Trend raporunu ekrana yazdırır."""
data = load_historical_data(mountpoint=mountpoint, days=days)
if not data:
print(f"'{mountpoint}' için yeterli veri bulunamadı.")
return
print(f"n=== {mountpoint} Trend Analizi (Son {days} Gün) ===")
print(f"Veri noktası sayısı: {len(data)}")
print(f"Başlangıç kullanımı: {data[0]['percent']}%")
print(f"Güncel kullanım: {data[-1]['percent']}%")
print(f"Değişim: {data[-1]['percent'] - data[0]['percent']:.1f}%")
predicted_date, slope = predict_full_date(data)
if slope is not None and slope > 0:
print(f"Büyüme hızı: Günde ~{slope * 24:.2f}%")
if predicted_date:
print(f"Tahmini %95 dolma tarihi: {predicted_date.strftime('%Y-%m-%d %H:%M')}")
days_remaining = (predicted_date - datetime.now()).days
if days_remaining < 30:
print(f"UYARI: {days_remaining} gün kaldı!")
elif slope is not None and slope <= 0:
print("Disk kullanımı azalıyor veya sabit. Risk yok.")
else:
print("Trend hesaplanamadı.")
if __name__ == '__main__':
print_trend_report('/', days=7)
EOF
python3 trend_analyzer.py
Systemd Service Olarak Kurulum
Scripti sürekli çalışan bir servis olarak ayarlamak istiyorsanız systemd kullanabilirsiniz:
# Servis dosyası oluştur
cat << 'EOF' > /etc/systemd/system/disk-monitor.service
[Unit]
Description=Disk İzleme ve Uyarı Servisi
After=network.target
[Service]
Type=simple
User=nobody
Group=nogroup
WorkingDirectory=/opt/disk_monitor
ExecStart=/usr/bin/python3 /opt/disk_monitor/disk_monitor.py
Restart=always
RestartSec=300
StandardOutput=journal
StandardError=journal
# Güvenlik kısıtlamaları
NoNewPrivileges=true
ReadOnlyPaths=/
ReadWritePaths=/var/log/disk_monitor
[Install]
WantedBy=multi-user.target
EOF
# Script dosyalarını yerleştir
mkdir -p /opt/disk_monitor
cp disk_monitor.py /opt/disk_monitor/
# Servis izinleri
chmod 644 /etc/systemd/system/disk-monitor.service
# Systemd'yi güncelle ve servisi başlat
systemctl daemon-reload
systemctl enable disk-monitor
systemctl start disk-monitor
systemctl status disk-monitor
# Logları takip et
journalctl -u disk-monitor -f
Gerçek Dünya Senaryosu: Log Dosyası Şişmesi
Pek çok sunucuda /var bölümü log dosyaları yüzünden şişer. Bu durumu hem tespit edip hem de neden dolduğunu anlatan bir ek modül yazalım:
cat << 'EOF' > disk_investigator.py
import os
import psutil
from pathlib import Path
def find_largest_files(directory, top_n=10, min_size_mb=10):
"""Belirtilen dizindeki en büyük dosyaları bulur."""
large_files = []
try:
for root, dirs, files in os.walk(directory):
# /proc ve /sys gibi özel dizinleri atla
dirs[:] = [d for d in dirs if not d.startswith('.')
and os.path.join(root, d) not in ['/proc', '/sys', '/dev']]
for filename in files:
filepath = os.path.join(root, filename)
try:
size = os.path.getsize(filepath)
size_mb = size / (1024 * 1024)
if size_mb >= min_size_mb:
large_files.append({
'path': filepath,
'size_mb': round(size_mb, 2),
'size_gb': round(size_mb / 1024, 3)
})
except (OSError, PermissionError):
continue
except PermissionError:
pass
# Büyükten küçüğe sırala
large_files.sort(key=lambda x: x['size_mb'], reverse=True)
return large_files[:top_n]
def investigate_disk_usage(mountpoint='/var'):
"""Disk doluluk sebebini araştırır."""
print(f"n=== {mountpoint} Disk Kullanım Analizi ===n")
usage = psutil.disk_usage(mountpoint)
print(f"Toplam: {usage.total / (1024**3):.2f} GB")
print(f"Kullanılan: {usage.used / (1024**3):.2f} GB ({usage.percent}%)")
print(f"Boş: {usage.free / (1024**3):.2f} GB")
print(f"nEn büyük dosyalar (10 MB üzeri):")
print("-" * 60)
large_files = find_largest_files(mountpoint, top_n=10, min_size_mb=10)
for i, file_info in enumerate(large_files, 1):
print(f"{i:2}. {file_info['size_mb']:8.1f} MB {file_info['path']}")
if not large_files:
print("10 MB üzerinde dosya bulunamadı.")
# Dizin bazlı özet
print(f"nAlt dizin kullanımları:")
print("-" * 60)
try:
subdirs = [(d.path, d.name) for d in os.scandir(mountpoint)
if d.is_dir() and not d.is_symlink()]
dir_sizes = []
for dirpath, dirname in subdirs:
try:
total = sum(
os.path.getsize(os.path.join(root, f))
for root, dirs, files in os.walk(dirpath)
for f in files
if not os.path.islink(os.path.join(root, f))
)
dir_sizes.append((dirname, total / (1024**2)))
except (PermissionError, OSError):
continue
dir_sizes.sort(key=lambda x: x[1], reverse=True)
for dirname, size_mb in dir_sizes[:10]:
print(f" {size_mb:8.1f} MB {mountpoint}/{dirname}")
except PermissionError:
print("Dizin erişim izni yok.")
if __name__ == '__main__':
investigate_disk_usage('/var')
EOF
python3 disk_investigator.py
Sonuç
Bu yazıda basit bir disk sorgusu scriptinden başlayıp, eşik bazlı uyarı sistemi, e-posta ve Slack bildirimleri, trend analizi ve systemd entegrasyonu olan kapsamlı bir izleme sistemine ulaştık. Tüm bu scriptler birbiriyle entegre çalışacak şekilde tasarlandı.
Birkaç önemli noktayı hatırlatmak isterim:
- Uyarı yorgunluğunu önlemek için cooldown mekanizmasını mutlaka kullanın. Her 5 dakikada bir e-posta almak uyarıları görmezden gelmenize yol açar.
- Log rotasyonu ayarlamayı unutmayın.
/etc/logrotate.d/disk_monitordosyası oluşturarak log dosyalarının şişmesini önleyin. Yoksa izleme scriptiniz kendi kendini baltalar. - Test edin. Scripti production’a almadan önce eşik değerlerini geçici olarak düşürüp uyarı mekanizmalarının gerçekten çalıştığını doğrulayın.
- Güvenlik: SMTP şifresi ve webhook URL’sini kesinlikle script içine yazmayın. Environment variable veya ayrı bir konfigürasyon dosyası kullanın, bu dosyayı da 600 izniyle koruyun.
Sistemi kendi ihtiyaçlarınıza göre genişletebilirsiniz: Telegram bildirimi, özel dosya sistemi filtreleri veya daha gelişmiş tahmin algoritmaları eklemek bu temel üzerine inşa etmek oldukça kolay. Önemli olan sizi gecenin üçünde uyandıracak o “disk doldu” çağrısını almadan önce harekete geçmek.