Sistem yöneticiliği yaparken er ya da geç şu noktaya geliyorsunuz: aynı işi defalarca tekrar ediyorsunuz. Sunucu loglarını kontrol etmek, disk kullanımını izlemek, kullanıcı hesaplarını yönetmek… Bunların hepsini manuel yapmak hem zaman kaybı hem de hata riski demek. İşte tam bu noktada Python devreye giriyor ve hayatınızı ciddi anlamda kolaylaştırıyor.
Python, sistem otomasyonu için neredeyse mükemmel bir araç. Sözdizimi sade, standart kütüphanesi zengin ve Linux/Windows üzerinde çalışan neredeyse her şeyle entegre olabiliyor. Bash scripting güçlü ama karmaşık string işlemleri, hata yönetimi ve veri yapıları söz konusu olduğunda Python açık ara öne çıkıyor. Bu yazıda sıfırdan başlayarak gerçek dünya senaryolarında kullanabileceğiniz Python otomasyon scriptlerini inceleyeceğiz.
Neden Python, Neden Bash Değil?
Bu soruyu çok alıyorum. Bash zaten var, neden Python öğreneyim ki? Haklı bir soru ama şunu düşünün: bir log dosyasını parse edip içindeki IP adreslerini saymanız, sonra bu sayımı bir JSON dosyasına yazmanız gerekiyor. Bunu Bash ile yazmaya çalışın, sonra Python ile. Farkı görürsünüz.
Python’un sysadmin dünyasında bu kadar popüler olmasının birkaç somut nedeni var:
- Okunabilirlik: Yazdığınız scripti 6 ay sonra açtığınızda ne yaptığını anlayabiliyorsunuz
- Hata yönetimi: try/except blokları ile hatalar kontrol altında
- Zengin standart kütüphane: os, sys, subprocess, shutil, pathlib… kurulum gerektirmeden kullanabilirsiniz
- Veri işleme: JSON, CSV, XML parse etmek Python ile çok kolay
- Cross-platform: Aynı script Linux’ta da Windows’ta da çalışıyor
Temel Kurulum ve Ortam Hazırlığı
Üretim sunucularında script çalıştırmadan önce sanal ortam (virtual environment) kullanmayı alışkanlık haline getirin. Sistem Python’una dokunmak istemiyorsunuz.
# Python versiyonunu kontrol et
python3 --version
# pip'i güncelle
python3 -m pip install --upgrade pip
# Proje için sanal ortam oluştur
python3 -m venv /opt/scripts/sysadmin-env
# Sanal ortamı aktifleştir
source /opt/scripts/sysadmin-env/bin/activate
# Sık kullanılan paketleri yükle
pip install psutil paramiko requests schedule
# Paketleri dondur (production için önemli)
pip freeze > requirements.txt
psutil sistem kaynaklarını okumak için, paramiko SSH bağlantıları için, requests HTTP istekleri için, schedule ise cron benzeri zamanlama için kullanacağız.
İlk Gerçek Senaryo: Disk Kullanım Monitörü
Klasik ama kritik bir senaryo. Disk dolduğunda sistem çöker, uyarı almak istiyorsunuz. Cron + df + mail kombinasyonu yerine daha akıllı bir çözüm yapalım.
#!/usr/bin/env python3
"""
Disk kullanim monitoru - kritik esiklerde email uyarisi gonderir
Kullanim: python3 disk_monitor.py
Cron: */15 * * * * /opt/scripts/sysadmin-env/bin/python3 /opt/scripts/disk_monitor.py
"""
import psutil
import smtplib
import json
import logging
from datetime import datetime
from email.mime.text import MIMEText
from pathlib import Path
# Logging konfigurasyonu
logging.basicConfig(
filename='/var/log/disk_monitor.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# Konfigürasyon
CONFIG = {
'warning_threshold': 75, # % - uyari
'critical_threshold': 90, # % - kritik
'partitions': ['/', '/var', '/home', '/tmp'],
'smtp_server': 'smtp.sirket.com',
'smtp_port': 587,
'email_from': '[email protected]',
'email_to': ['[email protected]', '[email protected]'],
'hostname': 'webserver01'
}
def get_disk_usage(partition):
"""Belirtilen partition'in disk kullanim bilgisini dondur"""
try:
usage = psutil.disk_usage(partition)
return {
'partition': partition,
'total_gb': round(usage.total / (1024**3), 2),
'used_gb': round(usage.used / (1024**3), 2),
'free_gb': round(usage.free / (1024**3), 2),
'percent': usage.percent
}
except PermissionError:
logging.warning(f"Izin hatasi: {partition} okunamadi")
return None
except FileNotFoundError:
logging.warning(f"Partition bulunamadi: {partition}")
return None
def send_alert(subject, body, level='WARNING'):
"""Email uyarisi gonder"""
try:
msg = MIMEText(body)
msg['Subject'] = f"[{level}] {CONFIG['hostname']}: {subject}"
msg['From'] = CONFIG['email_from']
msg['To'] = ', '.join(CONFIG['email_to'])
with smtplib.SMTP(CONFIG['smtp_server'], CONFIG['smtp_port']) as server:
server.starttls()
server.sendmail(CONFIG['email_from'], CONFIG['email_to'], msg.as_string())
logging.info(f"Alert gonderildi: {subject}")
except Exception as e:
logging.error(f"Email gonderilemedi: {e}")
def check_disks():
"""Tum diskleri kontrol et ve gerekirse uyari gonder"""
alerts = []
for partition in CONFIG['partitions']:
usage = get_disk_usage(partition)
if not usage:
continue
logging.info(f"{partition}: %{usage['percent']} kullaniliyor")
if usage['percent'] >= CONFIG['critical_threshold']:
alerts.append(('CRITICAL', usage))
elif usage['percent'] >= CONFIG['warning_threshold']:
alerts.append(('WARNING', usage))
for level, usage in alerts:
subject = f"Disk Dolulugu: {usage['partition']} %{usage['percent']}"
body = f"""
Sunucu: {CONFIG['hostname']}
Zaman: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
Partition: {usage['partition']}
Kullanim: %{usage['percent']}
Toplam: {usage['total_gb']} GB
Kullanilan: {usage['used_gb']} GB
Bos: {usage['free_gb']} GB
Acil mudahale gerekebilir!
"""
send_alert(subject, body, level)
if __name__ == '__main__':
check_disks()
Bu scripti /opt/scripts/disk_monitor.py olarak kaydedin ve cron’a ekleyin. 15 dakikada bir kontrol eder, eşiği aşınca mail atar.
OS Modülü ile Dosya Sistemi Operasyonları
os ve pathlib modülleri günlük dosya işlemlerinde en çok başvuracağınız araçlar. Eski log dosyalarını temizlemek, yedek dosyaları arşivlemek, dizin yapısı oluşturmak…
#!/usr/bin/env python3
"""
Log temizleme scripti - N gunden eski log dosyalarini siler/arsivler
"""
import os
import shutil
import gzip
import logging
from pathlib import Path
from datetime import datetime, timedelta
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
def clean_old_logs(log_dir, days_to_keep=30, archive_dir=None):
"""
Eski log dosyalarini temizle
log_dir: Log dosyalarinin bulundugu dizin
days_to_keep: Kac gun oncesine kadar tutulacak
archive_dir: Belirtilirse silmek yerine buraya arsivle
"""
log_path = Path(log_dir)
threshold_date = datetime.now() - timedelta(days=days_to_keep)
if not log_path.exists():
logging.error(f"Dizin bulunamadi: {log_dir}")
return
deleted_count = 0
archived_count = 0
total_freed_bytes = 0
# Tum .log ve .log.gz dosyalarini tara
for log_file in log_path.rglob('*.log'):
file_mtime = datetime.fromtimestamp(log_file.stat().st_mtime)
if file_mtime < threshold_date:
file_size = log_file.stat().st_size
if archive_dir:
# Arsivle: gzip ile sıkıştır
archive_path = Path(archive_dir)
archive_path.mkdir(parents=True, exist_ok=True)
archive_file = archive_path / f"{log_file.name}.gz"
with open(log_file, 'rb') as f_in:
with gzip.open(archive_file, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
log_file.unlink()
archived_count += 1
logging.info(f"Arşivlendi: {log_file.name} -> {archive_file}")
else:
# Direkt sil
log_file.unlink()
deleted_count += 1
logging.info(f"Silindi: {log_file}")
total_freed_bytes += file_size
total_freed_mb = round(total_freed_bytes / (1024**2), 2)
logging.info(f"Islem tamamlandi: {deleted_count} silindi, {archived_count} arşivlendi, {total_freed_mb} MB temizlendi")
# Kullanim ornekleri
clean_old_logs('/var/log/nginx', days_to_keep=7, archive_dir='/backup/logs/nginx')
clean_old_logs('/var/log/apache2', days_to_keep=14)
subprocess Modülü: Sistem Komutlarını Python’dan Yönetmek
Bazen doğrudan sistem komutlarını çalıştırmanız gerekiyor. os.system() kullanmayın, eski ve güvensiz. subprocess modülü çok daha güvenli ve kullanışlı.
#!/usr/bin/env python3
"""
Servis saglik kontrol scripti
Kritik servislerin calisip calismadığını kontrol eder, gerekirse yeniden baslatir
"""
import subprocess
import logging
import json
from datetime import datetime
logging.basicConfig(
filename='/var/log/service_monitor.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
CRITICAL_SERVICES = [
'nginx',
'postgresql',
'redis-server',
'fail2ban',
'ufw'
]
def run_command(cmd, timeout=30):
"""
Sistem komutu calistir, ciktiyi don
Donus degeri: (return_code, stdout, stderr)
"""
try:
result = subprocess.run(
cmd,
shell=False, # Shell injection'a karsi guvenli
capture_output=True,
text=True,
timeout=timeout
)
return result.returncode, result.stdout.strip(), result.stderr.strip()
except subprocess.TimeoutExpired:
logging.error(f"Komut zaman asimi: {' '.join(cmd)}")
return -1, '', 'Timeout'
except FileNotFoundError:
logging.error(f"Komut bulunamadi: {cmd[0]}")
return -1, '', 'Command not found'
def check_service(service_name):
"""Servisin aktif olup olmadigini kontrol et"""
returncode, stdout, _ = run_command(['systemctl', 'is-active', service_name])
return stdout == 'active'
def restart_service(service_name):
"""Servisi yeniden baslat"""
returncode, stdout, stderr = run_command(
['systemctl', 'restart', service_name],
timeout=60
)
return returncode == 0
def get_service_status(service_name):
"""Detayli servis durumu al"""
_, stdout, _ = run_command(['systemctl', 'status', service_name, '--no-pager', '-l'])
return stdout
def monitor_services():
"""Tum kritik servisleri izle"""
report = {
'timestamp': datetime.now().isoformat(),
'hostname': run_command(['hostname'])[1],
'services': []
}
issues_found = False
for service in CRITICAL_SERVICES:
is_active = check_service(service)
service_info = {
'name': service,
'status': 'active' if is_active else 'inactive',
'action_taken': None
}
if not is_active:
issues_found = True
logging.warning(f"Servis calısmiyor: {service}")
# Yeniden baslatmayi dene
logging.info(f"Yeniden baslatiliyor: {service}")
success = restart_service(service)
if success:
service_info['action_taken'] = 'restarted_successfully'
logging.info(f"Servis basariyla baslatildi: {service}")
else:
service_info['action_taken'] = 'restart_failed'
logging.error(f"Servis baslatILAMADI: {service}")
report['services'].append(service_info)
logging.info(f"Servis kontrol: {service} -> {'OK' if is_active else 'PROBLEM'}")
# Raporu kaydet
with open('/var/log/service_report.json', 'w') as f:
json.dump(report, f, indent=2)
return not issues_found
if __name__ == '__main__':
all_ok = monitor_services()
exit(0 if all_ok else 1)
Kullanıcı Yönetimi Otomasyonu
Yeni çalışan onboarding veya toplu kullanıcı oluşturma işlemlerini script ile halletmek hem zaman kazandırır hem de tutarlılık sağlar.
#!/usr/bin/env python3
"""
Toplu kullanici olusturma scripti
CSV dosyasindan kullanici listesi okur, sistemde hesap olusturur
"""
import subprocess
import csv
import secrets
import string
import logging
from pathlib import Path
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
def generate_password(length=16):
"""Guclu rastgele sifre uret"""
alphabet = string.ascii_letters + string.digits + string.punctuation
# Guvensiz karakterleri cikart (shell'de sorun cikarabilir)
alphabet = alphabet.replace("'", "").replace('"', "").replace('\', "")
return ''.join(secrets.choice(alphabet) for _ in range(length))
def user_exists(username):
"""Kullanicinin sistemde var olup olmadigini kontrol et"""
result = subprocess.run(
['id', username],
capture_output=True
)
return result.returncode == 0
def create_user(username, full_name, groups=None, shell='/bin/bash'):
"""
Sistem kullanicisi olustur
username: Kullanici adi
full_name: Ad soyad (GECOS)
groups: Ek gruplar listesi
shell: Varsayilan shell
"""
if user_exists(username):
logging.warning(f"Kullanici zaten var: {username}")
return False, None
password = generate_password()
# Kullanici olustur
cmd = [
'useradd',
'-m', # Home dizini olustur
'-c', full_name, # Tam ad
'-s', shell, # Shell
username
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
logging.error(f"Kullanici olusturulamadi {username}: {result.stderr}")
return False, None
# Sifre belirle
passwd_proc = subprocess.run(
['chpasswd'],
input=f"{username}:{password}",
capture_output=True,
text=True
)
if passwd_proc.returncode != 0:
logging.error(f"Sifre belirlenemedi {username}: {passwd_proc.stderr}")
# Gruplara ekle
if groups:
for group in groups:
subprocess.run(['usermod', '-aG', group, username])
# Ilk giriste sifre degistirmeyi zorla
subprocess.run(['chage', '-d', '0', username])
logging.info(f"Kullanici olusturuldu: {username} ({full_name})")
return True, password
def process_user_csv(csv_file):
"""CSV dosyasindan kullanici listesi isle"""
csv_path = Path(csv_file)
if not csv_path.exists():
logging.error(f"CSV dosyasi bulunamadi: {csv_file}")
return
results = []
with open(csv_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
# CSV format: username,full_name,groups,department
for row in reader:
username = row['username'].lower().strip()
full_name = row['full_name'].strip()
groups = row.get('groups', '').split(';') if row.get('groups') else []
groups = [g.strip() for g in groups if g.strip()]
success, password = create_user(username, full_name, groups)
results.append({
'username': username,
'success': success,
'password': password,
'department': row.get('department', '')
})
# Sonuclari kaydet (guvenli dosyaya!)
output_file = '/root/new_users_credentials.txt'
with open(output_file, 'w') as f:
f.write("YENi KULLANICI BILGILERI - GIZLIn")
f.write("=" * 40 + "nn")
for r in results:
if r['success']:
f.write(f"Kullanici: {r['username']}n")
f.write(f"Gecici Sifre: {r['password']}n")
f.write(f"Departman: {r['department']}n")
f.write("-" * 20 + "n")
# Dosya izinlerini kisitla
subprocess.run(['chmod', '600', output_file])
logging.info(f"Kimlik bilgileri kaydedildi: {output_file}")
# Kullanim
# process_user_csv('/tmp/new_employees.csv')
Ağ Bağlantısı Kontrolü ve Raporlama
#!/usr/bin/env python3
"""
Ag baglanabilirlik kontrol scripti
Kritik host ve servislerin eriselebilir olup olmadigini kontrol eder
"""
import socket
import subprocess
import time
import json
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
# Kontrol edilecek hedefler
TARGETS = [
{'host': '8.8.8.8', 'port': 53, 'name': 'Google DNS', 'type': 'tcp'},
{'host': 'veritabani.ic.sirket.com', 'port': 5432, 'name': 'PostgreSQL', 'type': 'tcp'},
{'host': 'redis.ic.sirket.com', 'port': 6379, 'name': 'Redis', 'type': 'tcp'},
{'host': 'api.sirket.com', 'port': 443, 'name': 'API Sunucu', 'type': 'tcp'},
{'host': '192.168.1.1', 'port': None, 'name': 'Gateway', 'type': 'ping'},
]
def check_tcp_port(host, port, timeout=5):
"""TCP port baglanabilirligini kontrol et"""
try:
start_time = time.time()
with socket.create_connection((host, port), timeout=timeout):
response_time = round((time.time() - start_time) * 1000, 2)
return True, response_time
except (socket.timeout, ConnectionRefusedError, OSError):
return False, None
def check_ping(host, count=3):
"""ICMP ping kontrol et"""
result = subprocess.run(
['ping', '-c', str(count), '-W', '2', host],
capture_output=True,
text=True
)
if result.returncode == 0:
# Ortalama ping suresini parse et
for line in result.stdout.split('n'):
if 'avg' in line or 'rtt' in line:
try:
avg_ms = float(line.split('/')[4])
return True, avg_ms
except (IndexError, ValueError):
pass
return True, None
return False, None
def check_target(target):
"""Tek bir hedefe baglanabilirlik testi yap"""
if target['type'] == 'tcp':
success, response_time = check_tcp_port(target['host'], target['port'])
else:
success, response_time = check_ping(target['host'])
return {
'name': target['name'],
'host': target['host'],
'port': target.get('port'),
'type': target['type'],
'reachable': success,
'response_time_ms': response_time,
'checked_at': datetime.now().isoformat()
}
def run_connectivity_check():
"""Tum hedefleri paralel kontrol et"""
results = []
# ThreadPoolExecutor ile paralel kontrol (zaman kazandiriyor)
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(check_target, target): target for target in TARGETS}
for future in as_completed(futures):
result = future.result()
results.append(result)
status = "OK" if result['reachable'] else "HATA"
rt = f"{result['response_time_ms']}ms" if result['response_time_ms'] else "N/A"
print(f"[{status}] {result['name']} ({result['host']}) - {rt}")
# Ozet rapor
total = len(results)
reachable = sum(1 for r in results if r['reachable'])
print(f"nOzet: {reachable}/{total} hedef erişilebilir")
return results
if __name__ == '__main__':
results = run_connectivity_check()
# JSON rapor kaydet
with open('/var/log/connectivity_report.json', 'w') as f:
json.dump(results, f, indent=2)
Cron Job Yönetimi ve Zamanlama
schedule kütüphanesi ile daemon olarak çalışan bir otomasyon scripti yazabilirsiniz:
#!/usr/bin/env python3
"""
Sistem bakim daemon'i
Periyodik bakim gorevlerini zamanlayarak calistirir
"""
import schedule
import time
import logging
import psutil
from datetime import datetime
logging.basicConfig(
filename='/var/log/maintenance_daemon.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def collect_system_metrics():
"""Sistem metriklerini topla ve logla"""
metrics = {
'timestamp': datetime.now().isoformat(),
'cpu_percent': psutil.cpu_percent(interval=1),
'memory_percent': psutil.virtual_memory().percent,
'disk_root_percent': psutil.disk_usage('/').percent,
'load_avg': psutil.getloadavg(),
'active_connections': len(psutil.net_connections(kind='tcp'))
}
logging.info(
f"Metrikler - CPU:%{metrics['cpu_percent']} "
f"RAM:%{metrics['memory_percent']} "
f"Disk:%{metrics['disk_root_percent']} "
f"Load:{metrics['load_avg'][0]:.2f}"
)
# Kritik esikleri kontrol et
if metrics['cpu_percent'] > 95:
logging.warning(f"YUKSEK CPU: %{metrics['cpu_percent']}")
if metrics['memory_percent'] > 90:
logging.warning(f"YUKSEK RAM: %{metrics['memory_percent']}")
def clear_temp_files():
"""Gecici dosyalari temizle"""
import glob
import os
patterns = ['/tmp/*.tmp', '/tmp/sess_*']
count = 0
for pattern in patterns:
for f in glob.glob(pattern):
try:
if os.path.isfile(f):
os.unlink(f)
count += 1
except PermissionError:
pass
if count > 0:
logging.info(f"Temizlendi: {count} gecici dosya")
def weekly_report():
"""Haftalik ozet rapor olustur"""
logging.info("Haftalik rapor olusturuluyor...")
# Burada rapor mantigi olacak
# Zamanlama ayarlari
schedule.every(5).minutes.do(collect_system_metrics)
schedule.every(1).hours.do(clear_temp_files)
schedule.every().monday.at("08:00").do(weekly_report)
logging.info("Maintenance daemon baslatildi")
if __name__ == '__main__':
while True:
schedule.run_pending()
time.sleep(60) # Her dakika kontrol et
Hata Yönetimi ve Best Practices
Üretim ortamında çalışacak scriptler için birkaç kritik kural var:
- Her zaman logging kullanın:
print()değil,loggingmodülü. Log seviyeleri (DEBUG, INFO, WARNING, ERROR, CRITICAL) anlam ifade ediyor - Exception handling yapın: Bare
except:kullanmayın, spesifik exception tiplerini yakalayın - Idempotent yazın: Script iki kez çalışsa bile sorun çıkarmamalı
- Dry-run modu ekleyin:
--dry-runparametresiyle ne yapacağını göster ama yapma - Konfigürasyonu koddan ayırın: Hardcoded değerler yerine config dosyası veya environment variable kullanın
- Root ile çalışmaktan kaçının: Gerekli minimum izni kullanın
- Script çıkış kodlarına dikkat edin: Başarıda 0, hatada sıfırdan farklı değer dönün
# Script'i test ortamında calistirmadan once syntax kontrolu
python3 -m py_compile disk_monitor.py && echo "Syntax OK"
# Linting - PEP8 uyum kontrolu
pip install flake8
flake8 disk_monitor.py
# Tip kontrolu (opsiyonel ama guzel)
pip install mypy
mypy disk_monitor.py
# Scripti cron'a ekle
crontab -e
# */15 * * * * /opt/scripts/sysadmin-env/bin/python3 /opt/scripts/disk_monitor.py >> /var/log/disk_monitor_cron.log 2>&1
Sonuç
Python ile sistem otomasyonu, başta zaman gerektiren bir yatırım gibi görünse de birkaç hafta içinde bu zamanı fazlasıyla geri kazanıyorsunuz. Şu ana kadar anlattıklarımı özetlersem:
psutilile sistem kaynaklarını okumak çok kolay ve cross-platform çalışıyorsubprocessmodülü sistem komutlarını güvenli şekilde çalıştırmanızı sağlıyor,shell=Falsekullanmayı unutmayınpathlibile dosya sistemi işlemleri çok daha temiz yazılıyorschedulekütüphanesi basit daemon’lar için cron’dan daha esnek bir alternatif- Logging, hata yönetimi ve konfigürasyon yönetimi üretim kalitesi scriptler için olmazsa olmaz
Başlangıç için önerim şu: önce kendi ortamınızdaki tekrarlayan bir görevi belirleyin. Belki her sabah 5 farklı sunucudaki disk kullanımını kontrol etmek, belki eski backup dosyalarını temizlemek. O görevi Python ile yazın. Çalışıyor mu? İyi. Şimdi hata yönetimi ekleyin, logging ekleyin, config dosyasına taşıyın. Her adımda öğreneceksiniz.
Bir uyarı: script yazmak bağımlılık yapar. Birkaç ay sonra kendinizi her şeyi otomatize etmeye çalışırken bulabilirsiniz. Bu aslında iyi bir şey, sysadmin’in temel içgüdüsü bu zaten. Tembel olmak için çok çalışmak.