Python ile Toplu Sunucu Yönetimi: Paralel İşlem Teknikleri

Yüzlerce sunucuya aynı anda bağlanıp bir şeyler yapmak gerektiğinde, sıralı çalışan Bash scriptleri bir noktadan sonra çekilmez hale geliyor. 50 sunucuya sırayla SSH açıp komut çalıştırmak, en iyi ihtimalle 10-15 dakika alıyor. Ama Python’ın paralel işlem kütüphaneleriyle bu süreyi dramatik biçimde kısaltmak mümkün. Bu yazıda, gerçek dünya senaryoları üzerinden Python ile toplu sunucu yönetimini nasıl verimli hale getirdiğimizi ele alacağız.

Neden Paralel İşlem?

Klasik sysadmin yaklaşımında bir for döngüsü yazıp sunucuları tek tek ziyaret ederiz. Bu senkron model küçük altyapılarda işe yarar ama 200-300 sunucuya çıktığınızda sabahın köründe terminale bakakalmak zorunda kalırsınız.

Paralel işlemin iki temel faydası var:

  • Zaman tasarrufu: 100 sunucuya 5 saniye süren bir komut çalıştırmak, sıralı yapıda 500 saniye alırken paralelde teorik minimum 5 saniyeye iniyor
  • Kaynak verimliliği: CPU beklerken ağ I/O’su çalışıyor, ağ beklerken başka bağlantılar kuruluyor

Python’da bu işi yapmanın birkaç yolu var: threading, multiprocessing, concurrent.futures ve asyncio. Sunucu yönetimi gibi I/O’ya bağlı (network-bound) işlemler için concurrent.futures.ThreadPoolExecutor ve asyncio en uygun seçenekler.

Temel Yapı: ThreadPoolExecutor ile Başlamak

En sade haliyle paralel SSH işlemi şöyle görünüyor:

# Önce gerekli kütüphaneyi kuralım
pip install paramiko
import paramiko
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

# Sunucu listesi - gerçekte bir dosyadan veya veritabanından okunabilir
SUNUCULAR = [
    {"host": "10.0.1.1", "port": 22},
    {"host": "10.0.1.2", "port": 22},
    {"host": "10.0.1.3", "port": 22},
    # ... devamı
]

def ssh_komut_calistir(sunucu_bilgi, komut, kullanici, anahtar_yolu):
    """Tek bir sunucuya SSH bağlanıp komut çalıştırır"""
    host = sunucu_bilgi["host"]
    port = sunucu_bilgi.get("port", 22)
    
    sonuc = {
        "host": host,
        "basarili": False,
        "cikti": "",
        "hata": ""
    }
    
    try:
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(
            hostname=host,
            port=port,
            username=kullanici,
            key_filename=anahtar_yolu,
            timeout=10
        )
        
        stdin, stdout, stderr = ssh.exec_command(komut, timeout=30)
        sonuc["cikti"] = stdout.read().decode("utf-8").strip()
        sonuc["hata"] = stderr.read().decode("utf-8").strip()
        sonuc["basarili"] = True
        ssh.close()
        
    except Exception as e:
        sonuc["hata"] = str(e)
    
    return sonuc

def paralel_calistir(sunucular, komut, kullanici, anahtar_yolu, max_thread=20):
    """Sunucularda paralel komut çalıştırır"""
    sonuclar = []
    basarili = 0
    basarisiz = 0
    
    with ThreadPoolExecutor(max_workers=max_thread) as executor:
        gelecekler = {
            executor.submit(
                ssh_komut_calistir, sunucu, komut, kullanici, anahtar_yolu
            ): sunucu
            for sunucu in sunucular
        }
        
        for gelecek in as_completed(gelecekler):
            sonuc = gelecek.result()
            sonuclar.append(sonuc)
            
            if sonuc["basarili"]:
                basarili += 1
                print(f"[OK] {sonuc['host']}: {sonuc['cikti'][:80]}")
            else:
                basarisiz += 1
                print(f"[HATA] {sonuc['host']}: {sonuc['hata']}")
    
    print(f"nToplam: {len(sunucular)} | Başarılı: {basarili} | Başarısız: {basarisiz}")
    return sonuclar

# Kullanım
if __name__ == "__main__":
    basla = time.time()
    sonuclar = paralel_calistir(
        sunucular=SUNUCULAR,
        komut="uptime && df -h / | tail -1",
        kullanici="admin",
        anahtar_yolu="/home/admin/.ssh/id_rsa",
        max_thread=20
    )
    print(f"Toplam süre: {time.time() - basla:.2f} saniye")

max_thread değerini kafanıza göre çok yüksek yazmayın. 200 sunucunuz varsa 200 thread açmak sisteminizi zorlayabilir ve hedefteki sunucularda da bağlantı reddi almanıza yol açabilir. 15-30 arası genellikle dengeli bir başlangıç noktası.

Gerçek Dünya Senaryosu 1: Disk Kullanım Raporu

Sabah geldiniz, monitoring sisteminden birkaç sunucunun disk dolmaya başladığına dair alarm aldınız. Hangi sunucuda ne kadar yer kaldığını hızlıca görmek istiyorsunuz.

import paramiko
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
from datetime import datetime

def disk_kullanim_al(host, kullanici, anahtar_yolu):
    """Sunucudan disk kullanım bilgisini çeker"""
    sonuc = {"host": host, "disk_verileri": [], "hata": None}
    
    try:
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(host, username=kullanici, key_filename=anahtar_yolu, timeout=8)
        
        # df çıktısını makine okunabilir formatta al
        komut = "df -h --output=source,size,used,avail,pcent,target | grep -v tmpfs | grep -v Filesystem"
        _, stdout, _ = ssh.exec_command(komut, timeout=15)
        
        for satir in stdout.readlines():
            parcalar = satir.strip().split()
            if len(parcalar) >= 6:
                yuzde = int(parcalar[4].replace("%", ""))
                sonuc["disk_verileri"].append({
                    "aygit": parcalar[0],
                    "boyut": parcalar[1],
                    "kullanilan": parcalar[2],
                    "bos": parcalar[3],
                    "yuzde": yuzde,
                    "mount": parcalar[5]
                })
        
        ssh.close()
    except Exception as e:
        sonuc["hata"] = str(e)
    
    return sonuc

def disk_raporu_olustur(sunucu_listesi, kullanici, anahtar_yolu, esik=80):
    """Tüm sunucuların disk raporunu oluşturur, eşik üstündekileri vurgular"""
    
    kritik_sunucular = []
    tum_sonuclar = []
    
    with ThreadPoolExecutor(max_workers=25) as executor:
        gelecekler = {
            executor.submit(disk_kullanim_al, host, kullanici, anahtar_yolu): host
            for host in sunucu_listesi
        }
        
        for gelecek in as_completed(gelecekler):
            sonuc = gelecek.result()
            tum_sonuclar.append(sonuc)
            
            if sonuc["hata"]:
                print(f"[ULASILAMADI] {sonuc['host']}: {sonuc['hata']}")
                continue
            
            for disk in sonuc["disk_verileri"]:
                if disk["yuzde"] >= esik:
                    kritik_sunucular.append({
                        "host": sonuc["host"],
                        "mount": disk["mount"],
                        "kullanim": disk["yuzde"],
                        "bos": disk["bos"]
                    })
                    print(f"[KRITIK] {sonuc['host']} - {disk['mount']}: %{disk['yuzde']} doldu, {disk['bos']} kaldı")
    
    # Raporu JSON'a yaz
    rapor_dosya = f"disk_rapor_{datetime.now().strftime('%Y%m%d_%H%M')}.json"
    with open(rapor_dosya, "w") as f:
        json.dump({
            "tarih": datetime.now().isoformat(),
            "toplam_sunucu": len(sunucu_listesi),
            "kritik": kritik_sunucular,
            "detay": tum_sonuclar
        }, f, indent=2, ensure_ascii=False)
    
    print(f"nRapor kaydedildi: {rapor_dosya}")
    print(f"Kritik sunucu sayısı: {len(kritik_sunucular)}")

Gerçek Dünya Senaryosu 2: Toplu Paket Güncelleme

En sık yapılan toplu işlemlerden biri güvenlik yamaları. Ama burada dikkatli olmak lazım, bazı sunuculara reboot gerekirken bazılarına gerekmeyebilir, bazı sunucular production’da aktif trafik alıyor olabilir.

import paramiko
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading

# Thread-safe loglama için kilit
log_kilidi = threading.Lock()

def guvenli_log(mesaj):
    with log_kilidi:
        print(mesaj)

def paket_guncelle(sunucu_bilgi, kullanici, anahtar_yolu, kuru_calistir=True):
    """
    Sunucuda paket güncellemesi yapar
    kuru_calistir=True ise gerçekte güncelleme yapmaz, sadece neyin güncelleneceğini gösterir
    """
    host = sunucu_bilgi["host"]
    grup = sunucu_bilgi.get("grup", "bilinmiyor")
    
    sonuc = {
        "host": host,
        "grup": grup,
        "guncellenen_paket": [],
        "basarili": False,
        "reboot_gerekli": False
    }
    
    try:
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(host, username=kullanici, key_filename=anahtar_yolu, timeout=10)
        
        # Önce listeyi güncelle
        _, stdout, stderr = ssh.exec_command("sudo apt-get update -qq 2>&1", timeout=60)
        stdout.read()  # tamamlanmasını bekle
        
        if kuru_calistir:
            # Sadece neyin güncelleneceğini göster
            komut = "apt list --upgradable 2>/dev/null | grep -v 'Listing'"
        else:
            # Gerçek güncelleme
            komut = "sudo DEBIAN_FRONTEND=noninteractive apt-get upgrade -y 2>&1"
        
        _, stdout, _ = ssh.exec_command(komut, timeout=300)
        cikti = stdout.read().decode("utf-8").strip()
        
        if cikti:
            sonuc["guncellenen_paket"] = [s.strip() for s in cikti.splitlines() if s.strip()]
        
        # Reboot gerekip gerekmediğini kontrol et
        _, stdout, _ = ssh.exec_command("test -f /var/run/reboot-required && echo 'REBOOT_GEREKLI'", timeout=5)
        if "REBOOT_GEREKLI" in stdout.read().decode():
            sonuc["reboot_gerekli"] = True
        
        sonuc["basarili"] = True
        ssh.close()
        
        durum = "KURUCALISTlRMA" if kuru_calistir else "GUNCELLENDI"
        reboot_uyari = " [REBOOT GEREKLI]" if sonuc["reboot_gerekli"] else ""
        guvenli_log(f"[{durum}] {host} ({grup}): {len(sonuc['guncellenen_paket'])} paket{reboot_uyari}")
        
    except Exception as e:
        sonuc["hata"] = str(e)
        guvenli_log(f"[HATA] {host}: {str(e)}")
    
    return sonuc

def toplu_guncelleme(sunucular, kullanici, anahtar_yolu, kuru_calistir=True, max_thread=10):
    """Belirli sunucu gruplarını belirli sırayla günceller"""
    
    # Önce production dışını güncelle
    gruplar = {}
    for s in sunucular:
        grup = s.get("grup", "diger")
        gruplar.setdefault(grup, []).append(s)
    
    oncelik_sirasi = ["dev", "staging", "production"]
    
    tum_sonuclar = []
    
    for grup in oncelik_sirasi:
        if grup not in gruplar:
            continue
        
        print(f"n--- {grup.upper()} grubu işleniyor ({len(gruplar[grup])} sunucu) ---")
        
        with ThreadPoolExecutor(max_workers=max_thread) as executor:
            gelecekler = {
                executor.submit(paket_guncelle, sunucu, kullanici, anahtar_yolu, kuru_calistir): sunucu
                for sunucu in gruplar[grup]
            }
            
            for gelecek in as_completed(gelecekler):
                sonuc = gelecek.result()
                tum_sonuclar.append(sonuc)
        
        # Bir sonraki gruba geçmeden onay iste (kuru çalıştırma değilse)
        if not kuru_calistir and grup != "production":
            onay = input(f"n{grup} tamamlandı. Devam edilsin mi? (e/h): ")
            if onay.lower() != "e":
                print("İşlem durduruldu.")
                break
    
    return tum_sonuclar

Hata Yönetimi ve Yeniden Deneme Mekanizması

Ağ üzerinde çalışırken bağlantı hataları kaçınılmaz. Akıllı bir retry mekanizması olmadan scriptiniz güvenilmez hale gelir.

import time
import paramiko
from functools import wraps

def yeniden_dene(max_deneme=3, bekleme_suresi=2, katlayici=True):
    """
    Decorator: Başarısız olan fonksiyonu otomatik yeniden dener
    katlayici=True ise her denemede bekleme süresini ikiye katlar
    """
    def dekorator(fonksiyon):
        @wraps(fonksiyon)
        def sarici(*args, **kwargs):
            bekleme = bekleme_suresi
            son_hata = None
            
            for deneme in range(1, max_deneme + 1):
                try:
                    return fonksiyon(*args, **kwargs)
                except (paramiko.ssh_exception.NoValidConnectionsError,
                        paramiko.ssh_exception.SSHException,
                        TimeoutError,
                        ConnectionResetError) as e:
                    son_hata = e
                    if deneme < max_deneme:
                        print(f"[DENEME {deneme}/{max_deneme}] {args[0] if args else ''}: {str(e)[:60]} - {bekleme}sn bekleniyor")
                        time.sleep(bekleme)
                        if katlayici:
                            bekleme *= 2
                    continue
                except Exception as e:
                    # Retry yapmaya değmeyecek hatalar (yetki hatası gibi)
                    raise e
            
            raise Exception(f"{max_deneme} denemede başarısız: {son_hata}")
        
        return sarici
    return dekorator

@yeniden_dene(max_deneme=3, bekleme_suresi=2, katlayici=True)
def guvenilir_ssh_komut(host, komut, kullanici, anahtar_yolu):
    """Retry mekanizması olan güvenilir SSH komut çalıştırıcı"""
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(host, username=kullanici, key_filename=anahtar_yolu, timeout=10)
    
    _, stdout, stderr = ssh.exec_command(komut, timeout=30)
    cikti = stdout.read().decode("utf-8").strip()
    hata = stderr.read().decode("utf-8").strip()
    ssh.close()
    
    return {"cikti": cikti, "hata": hata}

Asyncio ile Daha İleri Gitmek

Çok yüksek sunucu sayısıyla (500+) çalışıyorsanız, thread tabanlı yaklaşım yerine asyncio ve asyncssh kombinasyonu çok daha verimli.

pip install asyncssh
import asyncio
import asyncssh
import time

async def async_ssh_komut(host, komut, kullanici, anahtar_yolu, semaphore):
    """Asyncio ile tek sunucuya SSH"""
    async with semaphore:  # Eş zamanlı bağlantı sayısını sınırla
        try:
            async with asyncssh.connect(
                host,
                username=kullanici,
                client_keys=[anahtar_yolu],
                known_hosts=None,
                connect_timeout=10
            ) as baglanti:
                sonuc = await baglanti.run(komut, timeout=30)
                return {
                    "host": host,
                    "cikti": sonuc.stdout.strip(),
                    "hata": sonuc.stderr.strip(),
                    "donus_kodu": sonuc.returncode,
                    "basarili": sonuc.returncode == 0
                }
        except Exception as e:
            return {
                "host": host,
                "cikti": "",
                "hata": str(e),
                "basarili": False
            }

async def toplu_async_calistir(sunucular, komut, kullanici, anahtar_yolu, max_esanlili=50):
    """Tüm sunucularda asenkron komut çalıştırır"""
    
    # Semaphore ile eş zamanlı bağlantı sınırı
    semaphore = asyncio.Semaphore(max_esanlili)
    
    gorevler = [
        async_ssh_komut(host, komut, kullanici, anahtar_yolu, semaphore)
        for host in sunucular
    ]
    
    # gather ile tüm görevleri paralel çalıştır
    sonuclar = await asyncio.gather(*gorevler, return_exceptions=True)
    
    basarili = sum(1 for s in sonuclar if isinstance(s, dict) and s.get("basarili"))
    print(f"Tamamlandı: {basarili}/{len(sunucular)} başarılı")
    
    return sonuclar

# Kullanım
if __name__ == "__main__":
    sunucular = [f"10.0.1.{i}" for i in range(1, 101)]  # 100 sunucu
    
    basla = time.time()
    sonuclar = asyncio.run(
        toplu_async_calistir(
            sunucular=sunucular,
            komut="hostname && uname -r",
            kullanici="admin",
            anahtar_yolu="/home/admin/.ssh/id_rsa",
            max_esanlili=30
        )
    )
    print(f"100 sunucu toplam süre: {time.time() - basla:.2f} saniye")

Dosya Dağıtımı: Çok Sayıda Sunucuya Aynı Anda Dosya Kopyalamak

Konfigürasyon dosyası, script veya binary dağıtmak sysadmin’in günlük işlerinden biri. Bunu da paralel yapalım.

import paramiko
from concurrent.futures import ThreadPoolExecutor, as_completed
import os

def sftp_dosya_gonder(host, yerel_yol, uzak_yol, kullanici, anahtar_yolu):
    """SFTP ile dosya gönderir, gerekirse dizin oluşturur"""
    sonuc = {"host": host, "basarili": False, "hata": None}
    
    try:
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(host, username=kullanici, key_filename=anahtar_yolu, timeout=10)
        
        sftp = ssh.open_sftp()
        
        # Uzak dizini oluştur (yoksa)
        uzak_dizin = os.path.dirname(uzak_yol)
        try:
            sftp.stat(uzak_dizin)
        except FileNotFoundError:
            ssh.exec_command(f"mkdir -p {uzak_dizin}")
            import time
            time.sleep(0.5)
        
        # Dosyayı gönder
        sftp.put(yerel_yol, uzak_yol)
        
        # Dosya boyutunu doğrula
        uzak_boyut = sftp.stat(uzak_yol).st_size
        yerel_boyut = os.path.getsize(yerel_yol)
        
        if uzak_boyut != yerel_boyut:
            raise Exception(f"Boyut uyuşmazlığı: yerel={yerel_boyut}, uzak={uzak_boyut}")
        
        sftp.close()
        ssh.close()
        
        sonuc["basarili"] = True
        print(f"[GONDERILDI] {host}: {uzak_yol} ({yerel_boyut} byte)")
        
    except Exception as e:
        sonuc["hata"] = str(e)
        print(f"[HATA] {host}: {str(e)}")
    
    return sonuc

def toplu_dosya_dagit(sunucular, yerel_yol, uzak_yol, kullanici, anahtar_yolu):
    """Dosyayı tüm sunuculara dağıtır"""
    
    if not os.path.exists(yerel_yol):
        raise FileNotFoundError(f"Kaynak dosya bulunamadı: {yerel_yol}")
    
    print(f"Dağıtılıyor: {yerel_yol} -> {uzak_yol} ({len(sunucular)} sunucu)")
    
    with ThreadPoolExecutor(max_workers=15) as executor:
        gelecekler = {
            executor.submit(sftp_dosya_gonder, host, yerel_yol, uzak_yol, kullanici, anahtar_yolu): host
            for host in sunucular
        }
        
        sonuclar = [gelecek.result() for gelecek in as_completed(gelecekler)]
    
    basarili = sum(1 for s in sonuclar if s["basarili"])
    print(f"nDağıtım tamamlandı: {basarili}/{len(sunucular)}")
    
    return sonuclar

Pratik İpuçları ve Dikkat Edilecekler

Paralel SSH işlerinde öğrendiğim bazı kritik noktalar:

  • SSH key authentication şart: Şifre ile paralel bağlantı açmaya çalışmak hem güvensiz hem de çok daha yavaş
  • Timeout değerlerini küçümseme: Ağ sorunlarında threadlerinizin sonsuza dek beklemesini önlemek için hem connect hem exec_command için timeout belirleyin
  • Known hosts yönetimi: AutoAddPolicy() lab ortamında kullanışlı ama üretimde RejectPolicy veya önceden güncellenmiş bir known_hosts dosyası kullanın
  • Thread sayısına dikkat: Hedef sunucularda MaxSessions ve MaxStartups SSH parametreleri aynı anda çok fazla bağlantıyı reddedebilir. Çoğu OpenSSH varsayılanı 10 eş zamanlı unauthenticated bağlantıya izin verir
  • Çıktıları merkezi loglama sistemine yönlendirin: print() ile ekrana basmak 200 sunucu için çok gürültülü, sonuçları bir dosyaya veya ELK/Loki gibi bir sisteme gönderin
  • İdempotent scriptler yazın: Aynı scripti iki kez çalıştırmanın zarar vermeyeceğinden emin olun. Bir thread yarıda kalabilir, retry yapıldığında önceki durumu bozmamak kritik
  • Rate limiting: Çok agresif paralel işlem, switch’lerde connection flood alarmı tetikleyebilir. Büyük altyapılarda 50+ thread açmadan önce ağ ekibiyle konuşun

Sonuç

Python ile paralel sunucu yönetimi, sysadmin işini hem hızlandırıyor hem de kod olarak belgelemiş oluyor. ThreadPoolExecutor günlük 50-100 sunucu senaryoları için yeterince hızlı ve anlaşılır. Daha büyük ölçeğe çıktığınızda asyncssh + asyncio kombinasyonu bellek ve performans açısından çok daha avantajlı.

Yazıdaki kodları kopyalayıp doğrudan üretimde çalıştırmayın, kendi altyapınıza göre uyarlayın. Özellikle timeout değerleri, thread sayıları ve hata yönetimi ağ yapınıza ve sunucu sayınıza göre ayar gerektirir. Yeni bir script yazmadan önce her zaman birkaç test sunucusunda deneyin, kuru_calistir=True gibi güvenli modlar ekleyin ve ne yaptığını bilmeden bir şeyi 300 sunucuya uygulamayın.

Son olarak: Fabric ve Ansible gibi olgun araçlar zaten bu problemleri çözmüş durumda. Ama Python’ı sıfırdan anlamak, bu araçların içinde ne döndüğünü kavramak ve özel senaryolara adapte etmek için vazgeçilmez. İkisini birbirinin alternatifi değil, tamamlayıcısı olarak görün.

Yorum yapın