Python ile Otomasyon: Tekrarlayan Görevleri Otomatikleştirin
Sistem yöneticiliğinde en çok vakit çalan şey nedir diye sorsanız, cevap çoğunlukla aynı olur: tekrar eden işler. Her sabah aynı log dosyalarını kontrol etmek, disk kullanımını elle izlemek, kullanıcı hesaplarını tek tek oluşturmak… Bunların hepsini elle yapmak hem yorucu hem de hata yapmaya açık. Python bu noktada hayat kurtarıcı oluyor. Bash script yazabilirsiniz elbette, ama Python’un kütüphane zenginliği ve okunabilirliği, özellikle karmaşık otomasyon senaryolarında fark yaratıyor.
Bu yazıda gerçek dünyada karşılaştığım sorunları ve bunları Python ile nasıl çözdüğümü anlatacağım. Teorik değil, doğrudan üretim ortamında kullandığım veya kullandığım şeylere çok yakın örnekler bunlar.
Neden Python, Neden Şimdi
Bash scriptleri hâlâ geçerli ve güçlü. Ama bir noktadan sonra yetersiz kalmaya başlıyor. API çağrıları yapmanız gerektiğinde, JSON parse etmeniz gerektiğinde, veritabanına yazmanız gerektiğinde Bash ile uğraşmak vakit kaybı. Python bu tür işleri çok daha temiz hallediyor.
Ayrıca şunu da söyleyeyim: Python öğrenmek için yazılımcı olmak gerekmiyor. Sistem yöneticisi bakış açısıyla bakarsanız, script yazma mantığını zaten biliyorsunuzdur. Python sadece o mantığı daha güçlü bir araçla ifade etmenizi sağlıyor.
Başlamadan önce standart bir sanal ortam kuralım:
python3 -m venv /opt/sysadmin-tools/venv
source /opt/sysadmin-tools/venv/bin/activate
pip install requests paramiko psutil schedule python-dotenv
Disk Kullanımı İzleme ve Uyarı Sistemi
En klasik senaryo ile başlayalım. Disk doluyor, kimse fark etmiyor, servis çöküyor. Bu döngüyü kırmak için basit ama etkili bir script:
#!/usr/bin/env python3
import psutil
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
import os
from dotenv import load_dotenv
load_dotenv()
ESIK_YUZDE = 85
SMTP_SERVER = os.getenv("SMTP_SERVER")
SMTP_PORT = int(os.getenv("SMTP_PORT", 587))
SMTP_USER = os.getenv("SMTP_USER")
SMTP_PASS = os.getenv("SMTP_PASS")
ALICI_LISTE = os.getenv("ALICI_LISTE", "").split(",")
def disk_kontrol():
uyarilar = []
partitions = psutil.disk_partitions()
for partition in partitions:
try:
kullanim = psutil.disk_usage(partition.mountpoint)
yuzde = kullanim.percent
if yuzde >= ESIK_YUZDE:
uyari = {
"mountpoint": partition.mountpoint,
"device": partition.device,
"toplam_gb": round(kullanim.total / (1024**3), 2),
"kullanilan_gb": round(kullanim.used / (1024**3), 2),
"yuzde": yuzde
}
uyarilar.append(uyari)
except PermissionError:
continue
return uyarilar
def mail_gonder(uyarilar):
if not uyarilar:
return
hostname = os.uname().nodename
konu = f"[UYARI] {hostname} - Disk Kullanimi Kritik Seviyede"
govde = f"Sunucu: {hostname}nTarih: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}nn"
govde += "Kritik Disk Bolümleri:n" + "-"*40 + "n"
for uyari in uyarilar:
govde += f"nBolüm: {uyari['mountpoint']} ({uyari['device']})n"
govde += f"Kullanim: %{uyari['yuzde']} - {uyari['kullanilan_gb']} GB / {uyari['toplam_gb']} GBn"
msg = MIMEMultipart()
msg["From"] = SMTP_USER
msg["To"] = ", ".join(ALICI_LISTE)
msg["Subject"] = konu
msg.attach(MIMEText(govde, "plain", "utf-8"))
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
server.starttls()
server.login(SMTP_USER, SMTP_PASS)
server.sendmail(SMTP_USER, ALICI_LISTE, msg.as_string())
print(f"Mail gönderildi: {len(uyarilar)} uyarı")
if __name__ == "__main__":
uyarilar = disk_kontrol()
mail_gonder(uyarilar)
if not uyarilar:
print("Tüm disk bölümleri normal seviyede.")
Bu scripti cron’a eklemek için:
# Her 30 dakikada bir çalıştır
*/30 * * * * /opt/sysadmin-tools/venv/bin/python3 /opt/sysadmin-tools/disk_monitor.py >> /var/log/disk_monitor.log 2>&1
Log Dosyası Analizi ve Raporlama
Nginx veya Apache loglarını elle okumak kaybedilen zamandır. Şu script belirli bir zaman aralığındaki hata oranlarını, en çok istek atan IP’leri ve yavaş yanıt sürelerini çıkarıyor:
#!/usr/bin/env python3
import re
from collections import Counter, defaultdict
from datetime import datetime, timedelta
import sys
LOG_DOSYASI = "/var/log/nginx/access.log"
ANALIZ_SAATI = 1 # Son kaç saati analiz et
# Nginx combined log format pattern
LOG_PATTERN = re.compile(
r'(?P<ip>S+) S+ S+ [(?P<zaman>[^]]+)] '
r'"(?P<metod>S+) (?P<url>S+) S+" '
r'(?P<durum>d{3}) (?P<boyut>d+) '
r'"[^"]*" "(?P<agent>[^"]*)"'
)
def log_analiz(dosya_yolu, saat_aralik):
su_an = datetime.now()
baslangic = su_an - timedelta(hours=saat_aralik)
ip_sayac = Counter()
durum_sayac = Counter()
url_sayac = Counter()
hata_listesi = []
toplam_istek = 0
with open(dosya_yolu, "r", encoding="utf-8", errors="ignore") as f:
for satir in f:
eslesen = LOG_PATTERN.match(satir)
if not eslesen:
continue
zaman_str = eslesen.group("zaman").split()[0]
try:
zaman = datetime.strptime(zaman_str, "%d/%b/%Y:%H:%M:%S")
except ValueError:
continue
if zaman < baslangic:
continue
toplam_istek += 1
ip = eslesen.group("ip")
durum = eslesen.group("durum")
url = eslesen.group("url")
ip_sayac[ip] += 1
durum_sayac[durum] += 1
if durum.startswith("4") or durum.startswith("5"):
hata_listesi.append({
"zaman": zaman_str,
"ip": ip,
"url": url,
"durum": durum
})
return {
"toplam_istek": toplam_istek,
"ip_sayac": ip_sayac,
"durum_sayac": durum_sayac,
"hata_listesi": hata_listesi[:20],
"hata_orani": round(len(hata_listesi) / max(toplam_istek, 1) * 100, 2)
}
def rapor_yazdir(sonuc):
print(f"n{'='*50}")
print(f"NGINX LOG ANALİZİ - Son {ANALIZ_SAATI} Saat")
print(f"{'='*50}")
print(f"Toplam İstek: {sonuc['toplam_istek']}")
print(f"Hata Oranı: %{sonuc['hata_orani']}")
print(f"n-- HTTP Durum Kodları --")
for durum, sayi in sorted(sonuc['durum_sayac'].items()):
print(f" {durum}: {sayi} istek")
print(f"n-- En Çok İstek Atan 10 IP --")
for ip, sayi in sonuc['ip_sayac'].most_common(10):
print(f" {ip}: {sayi} istek")
print(f"n-- Son Hatalar (İlk 20) --")
for hata in sonuc['hata_listesi']:
print(f" [{hata['zaman']}] {hata['durum']} - {hata['ip']} - {hata['url'][:60]}")
if __name__ == "__main__":
sonuc = log_analiz(LOG_DOSYASI, ANALIZ_SAATI)
rapor_yazdir(sonuc)
SSH ile Toplu Sunucu Yönetimi
Onlarca sunucuda aynı komutu çalıştırmak gerektiğinde Paramiko devreye giriyor. Örneğin tüm sunucularda belirli bir paketin versiyonunu kontrol etmek ya da aynı anda tüm sistemlere yama uygulamak:
#!/usr/bin/env python3
import paramiko
import concurrent.futures
from typing import List, Dict
import json
SUNUCU_LISTESI = [
{"host": "web-01.sirket.local", "port": 22, "kullanici": "sysadmin"},
{"host": "web-02.sirket.local", "port": 22, "kullanici": "sysadmin"},
{"host": "db-01.sirket.local", "port": 2222, "kullanici": "sysadmin"},
{"host": "cache-01.sirket.local", "port": 22, "kullanici": "sysadmin"},
]
SSH_KEY = "/home/sysadmin/.ssh/id_rsa"
KOMUT = "hostname && uptime && df -h / | tail -1"
def sunucuya_baglan_ve_calistir(sunucu_bilgi: Dict) -> Dict:
sonuc = {
"host": sunucu_bilgi["host"],
"durum": "basarisiz",
"cikti": "",
"hata": ""
}
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(
hostname=sunucu_bilgi["host"],
port=sunucu_bilgi["port"],
username=sunucu_bilgi["kullanici"],
key_filename=SSH_KEY,
timeout=10
)
stdin, stdout, stderr = client.exec_command(KOMUT, timeout=30)
cikti = stdout.read().decode("utf-8").strip()
hata = stderr.read().decode("utf-8").strip()
sonuc["durum"] = "basarili"
sonuc["cikti"] = cikti
sonuc["hata"] = hata
client.close()
except paramiko.AuthenticationException:
sonuc["hata"] = "Kimlik doğrulama hatası"
except paramiko.SSHException as e:
sonuc["hata"] = f"SSH hatası: {str(e)}"
except Exception as e:
sonuc["hata"] = f"Bağlantı hatası: {str(e)}"
return sonuc
def toplu_komut_calistir(sunucu_listesi: List[Dict], max_workers: int = 5):
sonuclar = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
gelecekler = {
executor.submit(sunucuya_baglan_ve_calistir, sunucu): sunucu
for sunucu in sunucu_listesi
}
for gelecek in concurrent.futures.as_completed(gelecekler):
sonuc = gelecek.result()
sonuclar.append(sonuc)
if sonuc["durum"] == "basarili":
print(f"n[OK] {sonuc['host']}")
print(f" {sonuc['cikti'][:100]}")
else:
print(f"n[HATA] {sonuc['host']}: {sonuc['hata']}")
return sonuclar
if __name__ == "__main__":
print(f"Toplam {len(SUNUCU_LISTESI)} sunucuya bağlanılıyor...")
sonuclar = toplu_komut_calistir(SUNUCU_LISTESI)
basarili = sum(1 for s in sonuclar if s["durum"] == "basarili")
print(f"n{'='*40}")
print(f"Sonuç: {basarili}/{len(SUNUCU_LISTESI)} sunucu başarılı")
Bu scriptin paralel çalışması önemli. 50 sunucu varsa sıralı çalıştırmak dakikalar sürebilir, ThreadPoolExecutor ile saniyeler içinde tamamlanıyor.
Otomatik Yedekleme ve Rotasyon
Manuel yedek almak bir süre sonra ihmal ediliyor. Şu script belirli dizinleri sıkıştırıp uzak bir konuma kopyalıyor ve eski yedekleri temizliyor:
#!/usr/bin/env python3
import os
import tarfile
import shutil
from datetime import datetime, timedelta
from pathlib import Path
import hashlib
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("/var/log/yedekleme.log"),
logging.StreamHandler()
]
)
log = logging.getLogger(__name__)
YEDEK_KAYNAKLARI = [
"/etc/nginx",
"/etc/ssl/certs",
"/var/www/html",
"/home/deploy/.ssh"
]
YEDEK_HEDEF = "/backup/gunluk"
UZAK_HEDEF = "[email protected]:/volume1/sunucu-yedek"
SAKLANACAK_GUN = 30
def md5_hesapla(dosya_yolu: str) -> str:
hash_md5 = hashlib.md5()
with open(dosya_yolu, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def arsiv_olustur(kaynak_listesi: list, hedef_dizin: str) -> str:
tarih = datetime.now().strftime("%Y%m%d_%H%M%S")
hostname = os.uname().nodename
arsiv_adi = f"{hostname}_{tarih}.tar.gz"
arsiv_yolu = os.path.join(hedef_dizin, arsiv_adi)
os.makedirs(hedef_dizin, exist_ok=True)
log.info(f"Arşiv oluşturuluyor: {arsiv_yolu}")
with tarfile.open(arsiv_yolu, "w:gz") as tar:
for kaynak in kaynak_listesi:
if os.path.exists(kaynak):
tar.add(kaynak, arcname=kaynak.lstrip("/"))
log.info(f" Eklendi: {kaynak}")
else:
log.warning(f" Bulunamadı, atlandı: {kaynak}")
boyut_mb = os.path.getsize(arsiv_yolu) / (1024 * 1024)
checksum = md5_hesapla(arsiv_yolu)
checksum_dosya = arsiv_yolu + ".md5"
with open(checksum_dosya, "w") as f:
f.write(f"{checksum} {arsiv_adi}n")
log.info(f"Arşiv tamamlandı: {boyut_mb:.2f} MB, MD5: {checksum}")
return arsiv_yolu
def eski_yedekleri_temizle(hedef_dizin: str, gun_siniri: int):
sinir_tarihi = datetime.now() - timedelta(days=gun_siniri)
silinen = 0
for dosya in Path(hedef_dizin).glob("*.tar.gz"):
dosya_tarihi = datetime.fromtimestamp(dosya.stat().st_mtime)
if dosya_tarihi < sinir_tarihi:
dosya.unlink()
md5_dosya = Path(str(dosya) + ".md5")
if md5_dosya.exists():
md5_dosya.unlink()
silinen += 1
log.info(f"Silindi: {dosya.name}")
log.info(f"Temizleme tamamlandı: {silinen} eski yedek silindi")
def uzaga_kopyala(yerel_dosya: str, uzak_hedef: str):
komut = f"rsync -avz --progress {yerel_dosya} {uzak_hedef}/"
donus_kodu = os.system(komut)
if donus_kodu == 0:
log.info(f"Uzak kopyalama başarılı: {uzak_hedef}")
else:
log.error(f"Uzak kopyalama başarısız! Dönüş kodu: {donus_kodu}")
if __name__ == "__main__":
log.info("Yedekleme başlatılıyor...")
arsiv = arsiv_olustur(YEDEK_KAYNAKLARI, YEDEK_HEDEF)
uzaga_kopyala(arsiv, UZAK_HEDEF)
eski_yedekleri_temizle(YEDEK_HEDEF, SAKLANACAK_GUN)
log.info("Yedekleme tamamlandı.")
Kullanıcı Hesabı Toplu Oluşturma
Yeni çalışan geldiğinde sistem hesabı açmak, grup eklemek, home dizini oluşturmak… Bunların hepsini bir CSV’den okuyup otomatik yapan script:
#!/usr/bin/env python3
import csv
import subprocess
import secrets
import string
import sys
import logging
from pathlib import Path
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
VARSAYILAN_GRUPLAR = ["sudo", "docker"]
SIFRE_UZUNLUGU = 16
CIKTI_DOSYASI = "/root/yeni_kullanici_bilgileri.csv"
def guclu_sifre_uret(uzunluk: int = 16) -> str:
alfabe = string.ascii_letters + string.digits + "!@#$%^&*"
return "".join(secrets.choice(alfabe) for _ in range(uzunluk))
def kullanici_var_mi(kullanici_adi: str) -> bool:
sonuc = subprocess.run(
["id", kullanici_adi],
capture_output=True, text=True
)
return sonuc.returncode == 0
def kullanici_olustur(kullanici_adi: str, tam_ad: str, gruplar: list) -> str:
if kullanici_var_mi(kullanici_adi):
log.warning(f"Kullanıcı zaten mevcut: {kullanici_adi}")
return None
komut = [
"useradd",
"-m",
"-s", "/bin/bash",
"-c", tam_ad,
kullanici_adi
]
sonuc = subprocess.run(komut, capture_output=True, text=True)
if sonuc.returncode != 0:
log.error(f"Kullanıcı oluşturulamadı {kullanici_adi}: {sonuc.stderr}")
return None
sifre = guclu_sifre_uret(SIFRE_UZUNLUGU)
sifre_komutu = f"echo '{kullanici_adi}:{sifre}' | chpasswd"
os.system(sifre_komutu)
subprocess.run(["chage", "-d", "0", kullanici_adi])
for grup in gruplar:
subprocess.run(["usermod", "-aG", grup, kullanici_adi], capture_output=True)
ssh_dir = Path(f"/home/{kullanici_adi}/.ssh")
ssh_dir.mkdir(mode=0o700, exist_ok=True)
subprocess.run(["chown", "-R", f"{kullanici_adi}:{kullanici_adi}", str(ssh_dir)])
log.info(f"Kullanıcı oluşturuldu: {kullanici_adi} ({tam_ad})")
return sifre
def csv_den_kullanici_yukle(csv_dosyasi: str):
sonuclar = []
with open(csv_dosyasi, "r", encoding="utf-8") as f:
okuyucu = csv.DictReader(f)
for satir in okuyucu:
kullanici_adi = satir.get("kullanici_adi", "").strip().lower()
tam_ad = satir.get("tam_ad", "").strip()
ekstra_gruplar = satir.get("gruplar", "").split("|")
if not kullanici_adi:
continue
gruplar = VARSAYILAN_GRUPLAR + [g.strip() for g in ekstra_gruplar if g.strip()]
sifre = kullanici_olustur(kullanici_adi, tam_ad, gruplar)
if sifre:
sonuclar.append({
"kullanici_adi": kullanici_adi,
"tam_ad": tam_ad,
"sifre": sifre,
"gruplar": ",".join(gruplar)
})
if sonuclar:
with open(CIKTI_DOSYASI, "w", newline="", encoding="utf-8") as f:
alanllar = ["kullanici_adi", "tam_ad", "sifre", "gruplar"]
yazar = csv.DictWriter(f, fieldnames=alanllar)
yazar.writeheader()
yazar.writerows(sonuclar)
os.chmod(CIKTI_DOSYASI, 0o600)
log.info(f"Bilgiler kaydedildi: {CIKTI_DOSYASI}")
return sonuclar
if __name__ == "__main__":
import os
if os.geteuid() != 0:
print("Bu script root yetkisi gerektiriyor.")
sys.exit(1)
if len(sys.argv) < 2:
print("Kullanım: python3 kullanici_olustur.py kullanicilar.csv")
sys.exit(1)
sonuclar = csv_den_kullanici_yukle(sys.argv[1])
print(f"nToplam {len(sonuclar)} kullanıcı başarıyla oluşturuldu.")
CSV formatı şöyle olmalı:
- kullanici_adi: sistem kullanıcı adı
- tam_ad: görünen isim
- gruplar: pipe ile ayrılmış ekstra gruplar (dev|git gibi)
Servis Sağlığı Kontrol Scripti
HTTP endpoint’lerini, port’ları ve process durumlarını düzenli kontrol edip sonuçları bir JSON dosyasına yazan bu script, basit bir monitoring altyapısının temelini oluşturuyor:
#!/usr/bin/env python3
import requests
import socket
import subprocess
import json
from datetime import datetime
import time
KONTROL_LISTESI = [
{"isim": "Nginx Web", "tip": "http", "url": "http://localhost/health", "timeout": 5},
{"isim": "PostgreSQL", "tip": "port", "host": "localhost", "port": 5432, "timeout": 3},
{"isim": "Redis", "tip": "port", "host": "localhost", "port": 6379, "timeout": 3},
{"isim": "Celery Worker", "tip": "process", "process_adi": "celery"},
{"isim": "Elasticsearch", "tip": "http", "url": "http://localhost:9200/_cluster/health", "timeout": 5},
]
DURUM_DOSYASI = "/var/log/servis_durumu.json"
ARDISIK_HATA_ESIGI = 3
hata_sayaci = {}
def http_kontrol(url: str, timeout: int) -> dict:
try:
baslangic = time.time()
yanit = requests.get(url, timeout=timeout, verify=False)
sure = round((time.time() - baslangic) * 1000)
return {
"durum": "aktif" if yanit.status_code < 400 else "hata",
"http_kodu": yanit.status_code,
"yanit_suresi_ms": sure
}
except requests.exceptions.ConnectionError:
return {"durum": "baglanti_hatasi", "http_kodu": None, "yanit_suresi_ms": None}
except requests.exceptions.Timeout:
return {"durum": "zaman_asimi", "http_kodu": None, "yanit_suresi_ms": None}
def port_kontrol(host: str, port: int, timeout: int) -> dict:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
baslangic = time.time()
sonuc = sock.connect_ex((host, port))
sure = round((time.time() - baslangic) * 1000)
sock.close()
return {
"durum": "aktif" if sonuc == 0 else "kapali",
"yanit_suresi_ms": sure if sonuc == 0 else None
}
except Exception as e:
return {"durum": "hata", "mesaj": str(e)}
def process_kontrol(process_adi: str) -> dict:
sonuc = subprocess.run(
["pgrep", "-f", process_adi],
capture_output=True, text=True
)
pid_listesi = sonuc.stdout.strip().split("n") if sonuc.stdout.strip() else []
return {
"durum": "aktif" if pid_listesi else "calismiyor",
"pid_sayisi": len(pid_listesi)
}
def tum_servisleri_kontrol_et():
sonuclar = []
zaman = datetime.now().isoformat()
for servis in KONTROL_LISTESI:
isim = servis["isim"]
if servis["tip"] == "http":
kontrol = http_kontrol(servis["url"], servis["timeout"])
elif servis["tip"] == "port":
kontrol = port_kontrol(servis["host"], servis["port"], servis["timeout"])
elif servis["tip"] == "process":
kontrol = process_kontrol(servis["process_adi"])
else:
kontrol = {"durum": "bilinmeyen_tip"}
kontrol["isim"] = isim
kontrol["zaman"] = zaman
if kontrol["durum"] != "aktif":
hata_sayaci[isim] = hata_sayaci.get(isim, 0) + 1
if hata_sayaci[isim] >= ARDISIK_HATA_ESIGI:
print(f"[KRITİK] {isim} - {hata_sayaci[isim]} ardışık hata! Durum: {kontrol['durum']}")
else:
hata_sayaci[isim] = 0
sonuclar.append(kontrol)
durum_ikonu = "OK" if kontrol["durum"] == "aktif" else "!!"
print(f"[{durum_ikonu}] {isim}: {kontrol['durum']}")
with open(DURUM_DOSYASI, "w") as f:
json.dump({"kontrol_zamani": zaman, "servisler": sonuclar}, f, indent=2)
return sonuclar
if __name__ == "__main__":
tum_servisleri_kontrol_et()
Projeleri Bir Arada Tutmak: Schedule ile Görev Planlayıcı
Tüm bu scriptleri ayrı ayrı cron’a eklemek yerine, tek bir scheduler süreci çalıştırmak bazen daha temiz bir çözüm oluyor. Özellikle Docker container içinde çalışırken cron yönetimi can sıkıcı hale gelebiliyor:
#!/usr/bin/env python3
import schedule
import time
import logging
import subprocess
import sys
from datetime import datetime
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("/var/log/gorev_planlayici.log")
]
)
log = logging.getLogger("planlayici")
PYTHON = "/opt/sysadmin-tools/venv/bin/python3"
SCRIPT_DIZIN = "/opt/sysadmin-tools"
def script_calistir(script_adi: str, arglar: list = None):
def _calistir():
tam_yol = f"{SCRIPT_DIZIN}/{script_adi}"
komut = [PYTHON, tam_yol] + (arglar or [])
log.info(f"Görev başlatılıyor: {script_adi}")
try:
sonuc = subprocess.run(
komut,
capture_output=True,
text=True,
timeout=300
)
if sonuc.returncode == 0:
log.info(f"Görev tamamlandı: {script_adi}")
else:
log.error(f"Görev hata verdi: {script_adi}n{sonuc.stderr[:500]}")
except subprocess.TimeoutExpired:
log.error(f"Görev zaman aşımı: {script_adi}")
return _calistir
schedule.every(30).minutes.do(script_calistir("disk_monitor.py"))
schedule.every(1).hours.do(script_calistir("log_analiz.py"))
schedule.every().day.at("02:00").do(script_calistir("yedekleme.py"))
schedule.every(5).minutes.do(script_calistir("servis_kontrol.py"))
log.info("Görev planlayıcı başlatıldı.")
log.info(f"Tanımlı görev sayısı: {len(schedule.jobs)}")
while True:
schedule.run_pending()
time.sleep(60)
Bunu bir systemd servisi olarak çalıştırmak için /etc/systemd/system/sysadmin-planlayici.service dosyası oluşturun:
[Unit]
Description=Sysadmin Görev Planlayıcı
After=network.target
[Service]
Type=simple
User=root
WorkingDirectory=/opt/sysadmin-tools
ExecStart=/opt/sysadmin-tools/venv/bin/python3 /opt/sysadmin-tools/planlayici.py
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
systemctl daemon-reload
systemctl enable --now sysadmin-planlayici
systemctl status sysadmin-planlayici
Dikkat Edilmesi Gereken Noktalar
Güvenlik konusunda birkaç önemli nokta var. Öncelikle:
- Kimlik bilgilerini koda gömmek yerine
.envdosyası veya HashiCorp Vault gibi araçlar kullanın - Script’leri doğrudan root olarak çalıştırmak yerine sudo ile belirli komutlara yetki verin
- Log dosyalarına hassas bilgi yazmaktan kaçının, özellikle şifreler ve API anahtarları
- Üretim ortamında test etmeden önce staging ortamında deneyin, özellikle toplu işlemler için
- Her script için timeout değeri belirleyin, sonsuza kadar bekleyen bir script gece boyunca kaynakları tüketebilir
Hata yönetimi de kritik. Her external bağlantı, her dosya işlemi try-except bloğu içinde olmalı. Sessizce çöken bir otomasyon script’i, hiç yazılmamış olan’dan daha tehlikeli.
Sonuç
Python ile otomasyon başlangıçta biraz yatırım gerektiriyor: script yazmak, test etmek, monitoring’ini kurmak. Ama sonrasında kazanılan vakit ve azalan hata oranı bu yatırımı çok kısa sürede amorti ediyor.
Burada anlattığım scriptlerin hepsini sıfırdan yazmak zorunda değilsiniz. Kendi ortamınıza göre adapte edin. Önemli olan başlamak; küçük bir disk monitoring script’iyle bile olsa. Zamanla her tekrar eden görevi Python ile çözmeye başladığınızda, geriye dönüp “bunu neden daha önce yapmadım” diye soracaksınız.
Bir uyarı ile bitireyim: otomasyonun kendisini de izleyin. Otomasyon script’lerinizin çalışıp çalışmadığını, ne zaman son çalıştığını takip etmeyen sistemlerde sessiz hatalar birikerek büyük sorunlara dönüşebiliyor. Script’lerinizin son çalışma zamanını ve durumunu merkezi bir yerde tutun, üzerine alert ekleyin.
