Python ile Otomasyon: Tekrarlayan Görevleri Otomatikleştirin

Sistem yöneticiliğinde en çok vakit çalan şey nedir diye sorsanız, cevap çoğunlukla aynı olur: tekrar eden işler. Her sabah aynı log dosyalarını kontrol etmek, disk kullanımını elle izlemek, kullanıcı hesaplarını tek tek oluşturmak… Bunların hepsini elle yapmak hem yorucu hem de hata yapmaya açık. Python bu noktada hayat kurtarıcı oluyor. Bash script yazabilirsiniz elbette, ama Python’un kütüphane zenginliği ve okunabilirliği, özellikle karmaşık otomasyon senaryolarında fark yaratıyor.

Bu yazıda gerçek dünyada karşılaştığım sorunları ve bunları Python ile nasıl çözdüğümü anlatacağım. Teorik değil, doğrudan üretim ortamında kullandığım veya kullandığım şeylere çok yakın örnekler bunlar.

Neden Python, Neden Şimdi

Bash scriptleri hâlâ geçerli ve güçlü. Ama bir noktadan sonra yetersiz kalmaya başlıyor. API çağrıları yapmanız gerektiğinde, JSON parse etmeniz gerektiğinde, veritabanına yazmanız gerektiğinde Bash ile uğraşmak vakit kaybı. Python bu tür işleri çok daha temiz hallediyor.

Ayrıca şunu da söyleyeyim: Python öğrenmek için yazılımcı olmak gerekmiyor. Sistem yöneticisi bakış açısıyla bakarsanız, script yazma mantığını zaten biliyorsunuzdur. Python sadece o mantığı daha güçlü bir araçla ifade etmenizi sağlıyor.

Başlamadan önce standart bir sanal ortam kuralım:

python3 -m venv /opt/sysadmin-tools/venv
source /opt/sysadmin-tools/venv/bin/activate
pip install requests paramiko psutil schedule python-dotenv

Disk Kullanımı İzleme ve Uyarı Sistemi

En klasik senaryo ile başlayalım. Disk doluyor, kimse fark etmiyor, servis çöküyor. Bu döngüyü kırmak için basit ama etkili bir script:

#!/usr/bin/env python3
import psutil
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
import os
from dotenv import load_dotenv

load_dotenv()

ESIK_YUZDE = 85
SMTP_SERVER = os.getenv("SMTP_SERVER")
SMTP_PORT = int(os.getenv("SMTP_PORT", 587))
SMTP_USER = os.getenv("SMTP_USER")
SMTP_PASS = os.getenv("SMTP_PASS")
ALICI_LISTE = os.getenv("ALICI_LISTE", "").split(",")

def disk_kontrol():
    uyarilar = []
    partitions = psutil.disk_partitions()
    
    for partition in partitions:
        try:
            kullanim = psutil.disk_usage(partition.mountpoint)
            yuzde = kullanim.percent
            
            if yuzde >= ESIK_YUZDE:
                uyari = {
                    "mountpoint": partition.mountpoint,
                    "device": partition.device,
                    "toplam_gb": round(kullanim.total / (1024**3), 2),
                    "kullanilan_gb": round(kullanim.used / (1024**3), 2),
                    "yuzde": yuzde
                }
                uyarilar.append(uyari)
        except PermissionError:
            continue
    
    return uyarilar

def mail_gonder(uyarilar):
    if not uyarilar:
        return
    
    hostname = os.uname().nodename
    konu = f"[UYARI] {hostname} - Disk Kullanimi Kritik Seviyede"
    
    govde = f"Sunucu: {hostname}nTarih: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}nn"
    govde += "Kritik Disk Bolümleri:n" + "-"*40 + "n"
    
    for uyari in uyarilar:
        govde += f"nBolüm: {uyari['mountpoint']} ({uyari['device']})n"
        govde += f"Kullanim: %{uyari['yuzde']} - {uyari['kullanilan_gb']} GB / {uyari['toplam_gb']} GBn"
    
    msg = MIMEMultipart()
    msg["From"] = SMTP_USER
    msg["To"] = ", ".join(ALICI_LISTE)
    msg["Subject"] = konu
    msg.attach(MIMEText(govde, "plain", "utf-8"))
    
    with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
        server.starttls()
        server.login(SMTP_USER, SMTP_PASS)
        server.sendmail(SMTP_USER, ALICI_LISTE, msg.as_string())
        print(f"Mail gönderildi: {len(uyarilar)} uyarı")

if __name__ == "__main__":
    uyarilar = disk_kontrol()
    mail_gonder(uyarilar)
    
    if not uyarilar:
        print("Tüm disk bölümleri normal seviyede.")

Bu scripti cron’a eklemek için:

# Her 30 dakikada bir çalıştır
*/30 * * * * /opt/sysadmin-tools/venv/bin/python3 /opt/sysadmin-tools/disk_monitor.py >> /var/log/disk_monitor.log 2>&1

Log Dosyası Analizi ve Raporlama

Nginx veya Apache loglarını elle okumak kaybedilen zamandır. Şu script belirli bir zaman aralığındaki hata oranlarını, en çok istek atan IP’leri ve yavaş yanıt sürelerini çıkarıyor:

#!/usr/bin/env python3
import re
from collections import Counter, defaultdict
from datetime import datetime, timedelta
import sys

LOG_DOSYASI = "/var/log/nginx/access.log"
ANALIZ_SAATI = 1  # Son kaç saati analiz et

# Nginx combined log format pattern
LOG_PATTERN = re.compile(
    r'(?P<ip>S+) S+ S+ [(?P<zaman>[^]]+)] '
    r'"(?P<metod>S+) (?P<url>S+) S+" '
    r'(?P<durum>d{3}) (?P<boyut>d+) '
    r'"[^"]*" "(?P<agent>[^"]*)"'
)

def log_analiz(dosya_yolu, saat_aralik):
    su_an = datetime.now()
    baslangic = su_an - timedelta(hours=saat_aralik)
    
    ip_sayac = Counter()
    durum_sayac = Counter()
    url_sayac = Counter()
    hata_listesi = []
    toplam_istek = 0
    
    with open(dosya_yolu, "r", encoding="utf-8", errors="ignore") as f:
        for satir in f:
            eslesen = LOG_PATTERN.match(satir)
            if not eslesen:
                continue
            
            zaman_str = eslesen.group("zaman").split()[0]
            try:
                zaman = datetime.strptime(zaman_str, "%d/%b/%Y:%H:%M:%S")
            except ValueError:
                continue
            
            if zaman < baslangic:
                continue
            
            toplam_istek += 1
            ip = eslesen.group("ip")
            durum = eslesen.group("durum")
            url = eslesen.group("url")
            
            ip_sayac[ip] += 1
            durum_sayac[durum] += 1
            
            if durum.startswith("4") or durum.startswith("5"):
                hata_listesi.append({
                    "zaman": zaman_str,
                    "ip": ip,
                    "url": url,
                    "durum": durum
                })
    
    return {
        "toplam_istek": toplam_istek,
        "ip_sayac": ip_sayac,
        "durum_sayac": durum_sayac,
        "hata_listesi": hata_listesi[:20],
        "hata_orani": round(len(hata_listesi) / max(toplam_istek, 1) * 100, 2)
    }

def rapor_yazdir(sonuc):
    print(f"n{'='*50}")
    print(f"NGINX LOG ANALİZİ - Son {ANALIZ_SAATI} Saat")
    print(f"{'='*50}")
    print(f"Toplam İstek: {sonuc['toplam_istek']}")
    print(f"Hata Oranı: %{sonuc['hata_orani']}")
    
    print(f"n-- HTTP Durum Kodları --")
    for durum, sayi in sorted(sonuc['durum_sayac'].items()):
        print(f"  {durum}: {sayi} istek")
    
    print(f"n-- En Çok İstek Atan 10 IP --")
    for ip, sayi in sonuc['ip_sayac'].most_common(10):
        print(f"  {ip}: {sayi} istek")
    
    print(f"n-- Son Hatalar (İlk 20) --")
    for hata in sonuc['hata_listesi']:
        print(f"  [{hata['zaman']}] {hata['durum']} - {hata['ip']} - {hata['url'][:60]}")

if __name__ == "__main__":
    sonuc = log_analiz(LOG_DOSYASI, ANALIZ_SAATI)
    rapor_yazdir(sonuc)

SSH ile Toplu Sunucu Yönetimi

Onlarca sunucuda aynı komutu çalıştırmak gerektiğinde Paramiko devreye giriyor. Örneğin tüm sunucularda belirli bir paketin versiyonunu kontrol etmek ya da aynı anda tüm sistemlere yama uygulamak:

#!/usr/bin/env python3
import paramiko
import concurrent.futures
from typing import List, Dict
import json

SUNUCU_LISTESI = [
    {"host": "web-01.sirket.local", "port": 22, "kullanici": "sysadmin"},
    {"host": "web-02.sirket.local", "port": 22, "kullanici": "sysadmin"},
    {"host": "db-01.sirket.local", "port": 2222, "kullanici": "sysadmin"},
    {"host": "cache-01.sirket.local", "port": 22, "kullanici": "sysadmin"},
]

SSH_KEY = "/home/sysadmin/.ssh/id_rsa"
KOMUT = "hostname && uptime && df -h / | tail -1"

def sunucuya_baglan_ve_calistir(sunucu_bilgi: Dict) -> Dict:
    sonuc = {
        "host": sunucu_bilgi["host"],
        "durum": "basarisiz",
        "cikti": "",
        "hata": ""
    }
    
    try:
        client = paramiko.SSHClient()
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        client.connect(
            hostname=sunucu_bilgi["host"],
            port=sunucu_bilgi["port"],
            username=sunucu_bilgi["kullanici"],
            key_filename=SSH_KEY,
            timeout=10
        )
        
        stdin, stdout, stderr = client.exec_command(KOMUT, timeout=30)
        cikti = stdout.read().decode("utf-8").strip()
        hata = stderr.read().decode("utf-8").strip()
        
        sonuc["durum"] = "basarili"
        sonuc["cikti"] = cikti
        sonuc["hata"] = hata
        client.close()
        
    except paramiko.AuthenticationException:
        sonuc["hata"] = "Kimlik doğrulama hatası"
    except paramiko.SSHException as e:
        sonuc["hata"] = f"SSH hatası: {str(e)}"
    except Exception as e:
        sonuc["hata"] = f"Bağlantı hatası: {str(e)}"
    
    return sonuc

def toplu_komut_calistir(sunucu_listesi: List[Dict], max_workers: int = 5):
    sonuclar = []
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        gelecekler = {
            executor.submit(sunucuya_baglan_ve_calistir, sunucu): sunucu
            for sunucu in sunucu_listesi
        }
        
        for gelecek in concurrent.futures.as_completed(gelecekler):
            sonuc = gelecek.result()
            sonuclar.append(sonuc)
            
            if sonuc["durum"] == "basarili":
                print(f"n[OK] {sonuc['host']}")
                print(f"     {sonuc['cikti'][:100]}")
            else:
                print(f"n[HATA] {sonuc['host']}: {sonuc['hata']}")
    
    return sonuclar

if __name__ == "__main__":
    print(f"Toplam {len(SUNUCU_LISTESI)} sunucuya bağlanılıyor...")
    sonuclar = toplu_komut_calistir(SUNUCU_LISTESI)
    
    basarili = sum(1 for s in sonuclar if s["durum"] == "basarili")
    print(f"n{'='*40}")
    print(f"Sonuç: {basarili}/{len(SUNUCU_LISTESI)} sunucu başarılı")

Bu scriptin paralel çalışması önemli. 50 sunucu varsa sıralı çalıştırmak dakikalar sürebilir, ThreadPoolExecutor ile saniyeler içinde tamamlanıyor.

Otomatik Yedekleme ve Rotasyon

Manuel yedek almak bir süre sonra ihmal ediliyor. Şu script belirli dizinleri sıkıştırıp uzak bir konuma kopyalıyor ve eski yedekleri temizliyor:

#!/usr/bin/env python3
import os
import tarfile
import shutil
from datetime import datetime, timedelta
from pathlib import Path
import hashlib
import logging

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.FileHandler("/var/log/yedekleme.log"),
        logging.StreamHandler()
    ]
)
log = logging.getLogger(__name__)

YEDEK_KAYNAKLARI = [
    "/etc/nginx",
    "/etc/ssl/certs",
    "/var/www/html",
    "/home/deploy/.ssh"
]

YEDEK_HEDEF = "/backup/gunluk"
UZAK_HEDEF = "[email protected]:/volume1/sunucu-yedek"
SAKLANACAK_GUN = 30

def md5_hesapla(dosya_yolu: str) -> str:
    hash_md5 = hashlib.md5()
    with open(dosya_yolu, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
    return hash_md5.hexdigest()

def arsiv_olustur(kaynak_listesi: list, hedef_dizin: str) -> str:
    tarih = datetime.now().strftime("%Y%m%d_%H%M%S")
    hostname = os.uname().nodename
    arsiv_adi = f"{hostname}_{tarih}.tar.gz"
    arsiv_yolu = os.path.join(hedef_dizin, arsiv_adi)
    
    os.makedirs(hedef_dizin, exist_ok=True)
    
    log.info(f"Arşiv oluşturuluyor: {arsiv_yolu}")
    
    with tarfile.open(arsiv_yolu, "w:gz") as tar:
        for kaynak in kaynak_listesi:
            if os.path.exists(kaynak):
                tar.add(kaynak, arcname=kaynak.lstrip("/"))
                log.info(f"  Eklendi: {kaynak}")
            else:
                log.warning(f"  Bulunamadı, atlandı: {kaynak}")
    
    boyut_mb = os.path.getsize(arsiv_yolu) / (1024 * 1024)
    checksum = md5_hesapla(arsiv_yolu)
    
    checksum_dosya = arsiv_yolu + ".md5"
    with open(checksum_dosya, "w") as f:
        f.write(f"{checksum}  {arsiv_adi}n")
    
    log.info(f"Arşiv tamamlandı: {boyut_mb:.2f} MB, MD5: {checksum}")
    return arsiv_yolu

def eski_yedekleri_temizle(hedef_dizin: str, gun_siniri: int):
    sinir_tarihi = datetime.now() - timedelta(days=gun_siniri)
    silinen = 0
    
    for dosya in Path(hedef_dizin).glob("*.tar.gz"):
        dosya_tarihi = datetime.fromtimestamp(dosya.stat().st_mtime)
        if dosya_tarihi < sinir_tarihi:
            dosya.unlink()
            md5_dosya = Path(str(dosya) + ".md5")
            if md5_dosya.exists():
                md5_dosya.unlink()
            silinen += 1
            log.info(f"Silindi: {dosya.name}")
    
    log.info(f"Temizleme tamamlandı: {silinen} eski yedek silindi")

def uzaga_kopyala(yerel_dosya: str, uzak_hedef: str):
    komut = f"rsync -avz --progress {yerel_dosya} {uzak_hedef}/"
    donus_kodu = os.system(komut)
    if donus_kodu == 0:
        log.info(f"Uzak kopyalama başarılı: {uzak_hedef}")
    else:
        log.error(f"Uzak kopyalama başarısız! Dönüş kodu: {donus_kodu}")

if __name__ == "__main__":
    log.info("Yedekleme başlatılıyor...")
    
    arsiv = arsiv_olustur(YEDEK_KAYNAKLARI, YEDEK_HEDEF)
    uzaga_kopyala(arsiv, UZAK_HEDEF)
    eski_yedekleri_temizle(YEDEK_HEDEF, SAKLANACAK_GUN)
    
    log.info("Yedekleme tamamlandı.")

Kullanıcı Hesabı Toplu Oluşturma

Yeni çalışan geldiğinde sistem hesabı açmak, grup eklemek, home dizini oluşturmak… Bunların hepsini bir CSV’den okuyup otomatik yapan script:

#!/usr/bin/env python3
import csv
import subprocess
import secrets
import string
import sys
import logging
from pathlib import Path

log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")

VARSAYILAN_GRUPLAR = ["sudo", "docker"]
SIFRE_UZUNLUGU = 16
CIKTI_DOSYASI = "/root/yeni_kullanici_bilgileri.csv"

def guclu_sifre_uret(uzunluk: int = 16) -> str:
    alfabe = string.ascii_letters + string.digits + "!@#$%^&*"
    return "".join(secrets.choice(alfabe) for _ in range(uzunluk))

def kullanici_var_mi(kullanici_adi: str) -> bool:
    sonuc = subprocess.run(
        ["id", kullanici_adi],
        capture_output=True, text=True
    )
    return sonuc.returncode == 0

def kullanici_olustur(kullanici_adi: str, tam_ad: str, gruplar: list) -> str:
    if kullanici_var_mi(kullanici_adi):
        log.warning(f"Kullanıcı zaten mevcut: {kullanici_adi}")
        return None
    
    komut = [
        "useradd",
        "-m",
        "-s", "/bin/bash",
        "-c", tam_ad,
        kullanici_adi
    ]
    
    sonuc = subprocess.run(komut, capture_output=True, text=True)
    if sonuc.returncode != 0:
        log.error(f"Kullanıcı oluşturulamadı {kullanici_adi}: {sonuc.stderr}")
        return None
    
    sifre = guclu_sifre_uret(SIFRE_UZUNLUGU)
    sifre_komutu = f"echo '{kullanici_adi}:{sifre}' | chpasswd"
    os.system(sifre_komutu)
    
    subprocess.run(["chage", "-d", "0", kullanici_adi])
    
    for grup in gruplar:
        subprocess.run(["usermod", "-aG", grup, kullanici_adi], capture_output=True)
    
    ssh_dir = Path(f"/home/{kullanici_adi}/.ssh")
    ssh_dir.mkdir(mode=0o700, exist_ok=True)
    subprocess.run(["chown", "-R", f"{kullanici_adi}:{kullanici_adi}", str(ssh_dir)])
    
    log.info(f"Kullanıcı oluşturuldu: {kullanici_adi} ({tam_ad})")
    return sifre

def csv_den_kullanici_yukle(csv_dosyasi: str):
    sonuclar = []
    
    with open(csv_dosyasi, "r", encoding="utf-8") as f:
        okuyucu = csv.DictReader(f)
        for satir in okuyucu:
            kullanici_adi = satir.get("kullanici_adi", "").strip().lower()
            tam_ad = satir.get("tam_ad", "").strip()
            ekstra_gruplar = satir.get("gruplar", "").split("|")
            
            if not kullanici_adi:
                continue
            
            gruplar = VARSAYILAN_GRUPLAR + [g.strip() for g in ekstra_gruplar if g.strip()]
            sifre = kullanici_olustur(kullanici_adi, tam_ad, gruplar)
            
            if sifre:
                sonuclar.append({
                    "kullanici_adi": kullanici_adi,
                    "tam_ad": tam_ad,
                    "sifre": sifre,
                    "gruplar": ",".join(gruplar)
                })
    
    if sonuclar:
        with open(CIKTI_DOSYASI, "w", newline="", encoding="utf-8") as f:
            alanllar = ["kullanici_adi", "tam_ad", "sifre", "gruplar"]
            yazar = csv.DictWriter(f, fieldnames=alanllar)
            yazar.writeheader()
            yazar.writerows(sonuclar)
        
        os.chmod(CIKTI_DOSYASI, 0o600)
        log.info(f"Bilgiler kaydedildi: {CIKTI_DOSYASI}")
    
    return sonuclar

if __name__ == "__main__":
    import os
    if os.geteuid() != 0:
        print("Bu script root yetkisi gerektiriyor.")
        sys.exit(1)
    
    if len(sys.argv) < 2:
        print("Kullanım: python3 kullanici_olustur.py kullanicilar.csv")
        sys.exit(1)
    
    sonuclar = csv_den_kullanici_yukle(sys.argv[1])
    print(f"nToplam {len(sonuclar)} kullanıcı başarıyla oluşturuldu.")

CSV formatı şöyle olmalı:

  • kullanici_adi: sistem kullanıcı adı
  • tam_ad: görünen isim
  • gruplar: pipe ile ayrılmış ekstra gruplar (dev|git gibi)

Servis Sağlığı Kontrol Scripti

HTTP endpoint’lerini, port’ları ve process durumlarını düzenli kontrol edip sonuçları bir JSON dosyasına yazan bu script, basit bir monitoring altyapısının temelini oluşturuyor:

#!/usr/bin/env python3
import requests
import socket
import subprocess
import json
from datetime import datetime
import time

KONTROL_LISTESI = [
    {"isim": "Nginx Web", "tip": "http", "url": "http://localhost/health", "timeout": 5},
    {"isim": "PostgreSQL", "tip": "port", "host": "localhost", "port": 5432, "timeout": 3},
    {"isim": "Redis", "tip": "port", "host": "localhost", "port": 6379, "timeout": 3},
    {"isim": "Celery Worker", "tip": "process", "process_adi": "celery"},
    {"isim": "Elasticsearch", "tip": "http", "url": "http://localhost:9200/_cluster/health", "timeout": 5},
]

DURUM_DOSYASI = "/var/log/servis_durumu.json"
ARDISIK_HATA_ESIGI = 3
hata_sayaci = {}

def http_kontrol(url: str, timeout: int) -> dict:
    try:
        baslangic = time.time()
        yanit = requests.get(url, timeout=timeout, verify=False)
        sure = round((time.time() - baslangic) * 1000)
        
        return {
            "durum": "aktif" if yanit.status_code < 400 else "hata",
            "http_kodu": yanit.status_code,
            "yanit_suresi_ms": sure
        }
    except requests.exceptions.ConnectionError:
        return {"durum": "baglanti_hatasi", "http_kodu": None, "yanit_suresi_ms": None}
    except requests.exceptions.Timeout:
        return {"durum": "zaman_asimi", "http_kodu": None, "yanit_suresi_ms": None}

def port_kontrol(host: str, port: int, timeout: int) -> dict:
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(timeout)
        baslangic = time.time()
        sonuc = sock.connect_ex((host, port))
        sure = round((time.time() - baslangic) * 1000)
        sock.close()
        
        return {
            "durum": "aktif" if sonuc == 0 else "kapali",
            "yanit_suresi_ms": sure if sonuc == 0 else None
        }
    except Exception as e:
        return {"durum": "hata", "mesaj": str(e)}

def process_kontrol(process_adi: str) -> dict:
    sonuc = subprocess.run(
        ["pgrep", "-f", process_adi],
        capture_output=True, text=True
    )
    pid_listesi = sonuc.stdout.strip().split("n") if sonuc.stdout.strip() else []
    
    return {
        "durum": "aktif" if pid_listesi else "calismiyor",
        "pid_sayisi": len(pid_listesi)
    }

def tum_servisleri_kontrol_et():
    sonuclar = []
    zaman = datetime.now().isoformat()
    
    for servis in KONTROL_LISTESI:
        isim = servis["isim"]
        
        if servis["tip"] == "http":
            kontrol = http_kontrol(servis["url"], servis["timeout"])
        elif servis["tip"] == "port":
            kontrol = port_kontrol(servis["host"], servis["port"], servis["timeout"])
        elif servis["tip"] == "process":
            kontrol = process_kontrol(servis["process_adi"])
        else:
            kontrol = {"durum": "bilinmeyen_tip"}
        
        kontrol["isim"] = isim
        kontrol["zaman"] = zaman
        
        if kontrol["durum"] != "aktif":
            hata_sayaci[isim] = hata_sayaci.get(isim, 0) + 1
            if hata_sayaci[isim] >= ARDISIK_HATA_ESIGI:
                print(f"[KRITİK] {isim} - {hata_sayaci[isim]} ardışık hata! Durum: {kontrol['durum']}")
        else:
            hata_sayaci[isim] = 0
        
        sonuclar.append(kontrol)
        durum_ikonu = "OK" if kontrol["durum"] == "aktif" else "!!"
        print(f"[{durum_ikonu}] {isim}: {kontrol['durum']}")
    
    with open(DURUM_DOSYASI, "w") as f:
        json.dump({"kontrol_zamani": zaman, "servisler": sonuclar}, f, indent=2)
    
    return sonuclar

if __name__ == "__main__":
    tum_servisleri_kontrol_et()

Projeleri Bir Arada Tutmak: Schedule ile Görev Planlayıcı

Tüm bu scriptleri ayrı ayrı cron’a eklemek yerine, tek bir scheduler süreci çalıştırmak bazen daha temiz bir çözüm oluyor. Özellikle Docker container içinde çalışırken cron yönetimi can sıkıcı hale gelebiliyor:

#!/usr/bin/env python3
import schedule
import time
import logging
import subprocess
import sys
from datetime import datetime

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    handlers=[
        logging.StreamHandler(sys.stdout),
        logging.FileHandler("/var/log/gorev_planlayici.log")
    ]
)
log = logging.getLogger("planlayici")

PYTHON = "/opt/sysadmin-tools/venv/bin/python3"
SCRIPT_DIZIN = "/opt/sysadmin-tools"

def script_calistir(script_adi: str, arglar: list = None):
    def _calistir():
        tam_yol = f"{SCRIPT_DIZIN}/{script_adi}"
        komut = [PYTHON, tam_yol] + (arglar or [])
        
        log.info(f"Görev başlatılıyor: {script_adi}")
        try:
            sonuc = subprocess.run(
                komut,
                capture_output=True,
                text=True,
                timeout=300
            )
            if sonuc.returncode == 0:
                log.info(f"Görev tamamlandı: {script_adi}")
            else:
                log.error(f"Görev hata verdi: {script_adi}n{sonuc.stderr[:500]}")
        except subprocess.TimeoutExpired:
            log.error(f"Görev zaman aşımı: {script_adi}")
    
    return _calistir

schedule.every(30).minutes.do(script_calistir("disk_monitor.py"))
schedule.every(1).hours.do(script_calistir("log_analiz.py"))
schedule.every().day.at("02:00").do(script_calistir("yedekleme.py"))
schedule.every(5).minutes.do(script_calistir("servis_kontrol.py"))

log.info("Görev planlayıcı başlatıldı.")
log.info(f"Tanımlı görev sayısı: {len(schedule.jobs)}")

while True:
    schedule.run_pending()
    time.sleep(60)

Bunu bir systemd servisi olarak çalıştırmak için /etc/systemd/system/sysadmin-planlayici.service dosyası oluşturun:

[Unit]
Description=Sysadmin Görev Planlayıcı
After=network.target

[Service]
Type=simple
User=root
WorkingDirectory=/opt/sysadmin-tools
ExecStart=/opt/sysadmin-tools/venv/bin/python3 /opt/sysadmin-tools/planlayici.py
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target
systemctl daemon-reload
systemctl enable --now sysadmin-planlayici
systemctl status sysadmin-planlayici

Dikkat Edilmesi Gereken Noktalar

Güvenlik konusunda birkaç önemli nokta var. Öncelikle:

  • Kimlik bilgilerini koda gömmek yerine .env dosyası veya HashiCorp Vault gibi araçlar kullanın
  • Script’leri doğrudan root olarak çalıştırmak yerine sudo ile belirli komutlara yetki verin
  • Log dosyalarına hassas bilgi yazmaktan kaçının, özellikle şifreler ve API anahtarları
  • Üretim ortamında test etmeden önce staging ortamında deneyin, özellikle toplu işlemler için
  • Her script için timeout değeri belirleyin, sonsuza kadar bekleyen bir script gece boyunca kaynakları tüketebilir

Hata yönetimi de kritik. Her external bağlantı, her dosya işlemi try-except bloğu içinde olmalı. Sessizce çöken bir otomasyon script’i, hiç yazılmamış olan’dan daha tehlikeli.

Sonuç

Python ile otomasyon başlangıçta biraz yatırım gerektiriyor: script yazmak, test etmek, monitoring’ini kurmak. Ama sonrasında kazanılan vakit ve azalan hata oranı bu yatırımı çok kısa sürede amorti ediyor.

Burada anlattığım scriptlerin hepsini sıfırdan yazmak zorunda değilsiniz. Kendi ortamınıza göre adapte edin. Önemli olan başlamak; küçük bir disk monitoring script’iyle bile olsa. Zamanla her tekrar eden görevi Python ile çözmeye başladığınızda, geriye dönüp “bunu neden daha önce yapmadım” diye soracaksınız.

Bir uyarı ile bitireyim: otomasyonun kendisini de izleyin. Otomasyon script’lerinizin çalışıp çalışmadığını, ne zaman son çalıştığını takip etmeyen sistemlerde sessiz hatalar birikerek büyük sorunlara dönüşebiliyor. Script’lerinizin son çalışma zamanını ve durumunu merkezi bir yerde tutun, üzerine alert ekleyin.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir