Yüzlerce sunucuya aynı anda bağlanıp bir şeyler yapmak gerektiğinde, sıralı çalışan Bash scriptleri bir noktadan sonra çekilmez hale geliyor. 50 sunucuya sırayla SSH açıp komut çalıştırmak, en iyi ihtimalle 10-15 dakika alıyor. Ama Python’ın paralel işlem kütüphaneleriyle bu süreyi dramatik biçimde kısaltmak mümkün. Bu yazıda, gerçek dünya senaryoları üzerinden Python ile toplu sunucu yönetimini nasıl verimli hale getirdiğimizi ele alacağız.
Neden Paralel İşlem?
Klasik sysadmin yaklaşımında bir for döngüsü yazıp sunucuları tek tek ziyaret ederiz. Bu senkron model küçük altyapılarda işe yarar ama 200-300 sunucuya çıktığınızda sabahın köründe terminale bakakalmak zorunda kalırsınız.
Paralel işlemin iki temel faydası var:
- Zaman tasarrufu: 100 sunucuya 5 saniye süren bir komut çalıştırmak, sıralı yapıda 500 saniye alırken paralelde teorik minimum 5 saniyeye iniyor
- Kaynak verimliliği: CPU beklerken ağ I/O’su çalışıyor, ağ beklerken başka bağlantılar kuruluyor
Python’da bu işi yapmanın birkaç yolu var: threading, multiprocessing, concurrent.futures ve asyncio. Sunucu yönetimi gibi I/O’ya bağlı (network-bound) işlemler için concurrent.futures.ThreadPoolExecutor ve asyncio en uygun seçenekler.
Temel Yapı: ThreadPoolExecutor ile Başlamak
En sade haliyle paralel SSH işlemi şöyle görünüyor:
# Önce gerekli kütüphaneyi kuralım
pip install paramiko
import paramiko
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
# Sunucu listesi - gerçekte bir dosyadan veya veritabanından okunabilir
SUNUCULAR = [
{"host": "10.0.1.1", "port": 22},
{"host": "10.0.1.2", "port": 22},
{"host": "10.0.1.3", "port": 22},
# ... devamı
]
def ssh_komut_calistir(sunucu_bilgi, komut, kullanici, anahtar_yolu):
"""Tek bir sunucuya SSH bağlanıp komut çalıştırır"""
host = sunucu_bilgi["host"]
port = sunucu_bilgi.get("port", 22)
sonuc = {
"host": host,
"basarili": False,
"cikti": "",
"hata": ""
}
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(
hostname=host,
port=port,
username=kullanici,
key_filename=anahtar_yolu,
timeout=10
)
stdin, stdout, stderr = ssh.exec_command(komut, timeout=30)
sonuc["cikti"] = stdout.read().decode("utf-8").strip()
sonuc["hata"] = stderr.read().decode("utf-8").strip()
sonuc["basarili"] = True
ssh.close()
except Exception as e:
sonuc["hata"] = str(e)
return sonuc
def paralel_calistir(sunucular, komut, kullanici, anahtar_yolu, max_thread=20):
"""Sunucularda paralel komut çalıştırır"""
sonuclar = []
basarili = 0
basarisiz = 0
with ThreadPoolExecutor(max_workers=max_thread) as executor:
gelecekler = {
executor.submit(
ssh_komut_calistir, sunucu, komut, kullanici, anahtar_yolu
): sunucu
for sunucu in sunucular
}
for gelecek in as_completed(gelecekler):
sonuc = gelecek.result()
sonuclar.append(sonuc)
if sonuc["basarili"]:
basarili += 1
print(f"[OK] {sonuc['host']}: {sonuc['cikti'][:80]}")
else:
basarisiz += 1
print(f"[HATA] {sonuc['host']}: {sonuc['hata']}")
print(f"nToplam: {len(sunucular)} | Başarılı: {basarili} | Başarısız: {basarisiz}")
return sonuclar
# Kullanım
if __name__ == "__main__":
basla = time.time()
sonuclar = paralel_calistir(
sunucular=SUNUCULAR,
komut="uptime && df -h / | tail -1",
kullanici="admin",
anahtar_yolu="/home/admin/.ssh/id_rsa",
max_thread=20
)
print(f"Toplam süre: {time.time() - basla:.2f} saniye")
max_thread değerini kafanıza göre çok yüksek yazmayın. 200 sunucunuz varsa 200 thread açmak sisteminizi zorlayabilir ve hedefteki sunucularda da bağlantı reddi almanıza yol açabilir. 15-30 arası genellikle dengeli bir başlangıç noktası.
Gerçek Dünya Senaryosu 1: Disk Kullanım Raporu
Sabah geldiniz, monitoring sisteminden birkaç sunucunun disk dolmaya başladığına dair alarm aldınız. Hangi sunucuda ne kadar yer kaldığını hızlıca görmek istiyorsunuz.
import paramiko
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
from datetime import datetime
def disk_kullanim_al(host, kullanici, anahtar_yolu):
"""Sunucudan disk kullanım bilgisini çeker"""
sonuc = {"host": host, "disk_verileri": [], "hata": None}
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(host, username=kullanici, key_filename=anahtar_yolu, timeout=8)
# df çıktısını makine okunabilir formatta al
komut = "df -h --output=source,size,used,avail,pcent,target | grep -v tmpfs | grep -v Filesystem"
_, stdout, _ = ssh.exec_command(komut, timeout=15)
for satir in stdout.readlines():
parcalar = satir.strip().split()
if len(parcalar) >= 6:
yuzde = int(parcalar[4].replace("%", ""))
sonuc["disk_verileri"].append({
"aygit": parcalar[0],
"boyut": parcalar[1],
"kullanilan": parcalar[2],
"bos": parcalar[3],
"yuzde": yuzde,
"mount": parcalar[5]
})
ssh.close()
except Exception as e:
sonuc["hata"] = str(e)
return sonuc
def disk_raporu_olustur(sunucu_listesi, kullanici, anahtar_yolu, esik=80):
"""Tüm sunucuların disk raporunu oluşturur, eşik üstündekileri vurgular"""
kritik_sunucular = []
tum_sonuclar = []
with ThreadPoolExecutor(max_workers=25) as executor:
gelecekler = {
executor.submit(disk_kullanim_al, host, kullanici, anahtar_yolu): host
for host in sunucu_listesi
}
for gelecek in as_completed(gelecekler):
sonuc = gelecek.result()
tum_sonuclar.append(sonuc)
if sonuc["hata"]:
print(f"[ULASILAMADI] {sonuc['host']}: {sonuc['hata']}")
continue
for disk in sonuc["disk_verileri"]:
if disk["yuzde"] >= esik:
kritik_sunucular.append({
"host": sonuc["host"],
"mount": disk["mount"],
"kullanim": disk["yuzde"],
"bos": disk["bos"]
})
print(f"[KRITIK] {sonuc['host']} - {disk['mount']}: %{disk['yuzde']} doldu, {disk['bos']} kaldı")
# Raporu JSON'a yaz
rapor_dosya = f"disk_rapor_{datetime.now().strftime('%Y%m%d_%H%M')}.json"
with open(rapor_dosya, "w") as f:
json.dump({
"tarih": datetime.now().isoformat(),
"toplam_sunucu": len(sunucu_listesi),
"kritik": kritik_sunucular,
"detay": tum_sonuclar
}, f, indent=2, ensure_ascii=False)
print(f"nRapor kaydedildi: {rapor_dosya}")
print(f"Kritik sunucu sayısı: {len(kritik_sunucular)}")
Gerçek Dünya Senaryosu 2: Toplu Paket Güncelleme
En sık yapılan toplu işlemlerden biri güvenlik yamaları. Ama burada dikkatli olmak lazım, bazı sunuculara reboot gerekirken bazılarına gerekmeyebilir, bazı sunucular production’da aktif trafik alıyor olabilir.
import paramiko
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
# Thread-safe loglama için kilit
log_kilidi = threading.Lock()
def guvenli_log(mesaj):
with log_kilidi:
print(mesaj)
def paket_guncelle(sunucu_bilgi, kullanici, anahtar_yolu, kuru_calistir=True):
"""
Sunucuda paket güncellemesi yapar
kuru_calistir=True ise gerçekte güncelleme yapmaz, sadece neyin güncelleneceğini gösterir
"""
host = sunucu_bilgi["host"]
grup = sunucu_bilgi.get("grup", "bilinmiyor")
sonuc = {
"host": host,
"grup": grup,
"guncellenen_paket": [],
"basarili": False,
"reboot_gerekli": False
}
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(host, username=kullanici, key_filename=anahtar_yolu, timeout=10)
# Önce listeyi güncelle
_, stdout, stderr = ssh.exec_command("sudo apt-get update -qq 2>&1", timeout=60)
stdout.read() # tamamlanmasını bekle
if kuru_calistir:
# Sadece neyin güncelleneceğini göster
komut = "apt list --upgradable 2>/dev/null | grep -v 'Listing'"
else:
# Gerçek güncelleme
komut = "sudo DEBIAN_FRONTEND=noninteractive apt-get upgrade -y 2>&1"
_, stdout, _ = ssh.exec_command(komut, timeout=300)
cikti = stdout.read().decode("utf-8").strip()
if cikti:
sonuc["guncellenen_paket"] = [s.strip() for s in cikti.splitlines() if s.strip()]
# Reboot gerekip gerekmediğini kontrol et
_, stdout, _ = ssh.exec_command("test -f /var/run/reboot-required && echo 'REBOOT_GEREKLI'", timeout=5)
if "REBOOT_GEREKLI" in stdout.read().decode():
sonuc["reboot_gerekli"] = True
sonuc["basarili"] = True
ssh.close()
durum = "KURUCALISTlRMA" if kuru_calistir else "GUNCELLENDI"
reboot_uyari = " [REBOOT GEREKLI]" if sonuc["reboot_gerekli"] else ""
guvenli_log(f"[{durum}] {host} ({grup}): {len(sonuc['guncellenen_paket'])} paket{reboot_uyari}")
except Exception as e:
sonuc["hata"] = str(e)
guvenli_log(f"[HATA] {host}: {str(e)}")
return sonuc
def toplu_guncelleme(sunucular, kullanici, anahtar_yolu, kuru_calistir=True, max_thread=10):
"""Belirli sunucu gruplarını belirli sırayla günceller"""
# Önce production dışını güncelle
gruplar = {}
for s in sunucular:
grup = s.get("grup", "diger")
gruplar.setdefault(grup, []).append(s)
oncelik_sirasi = ["dev", "staging", "production"]
tum_sonuclar = []
for grup in oncelik_sirasi:
if grup not in gruplar:
continue
print(f"n--- {grup.upper()} grubu işleniyor ({len(gruplar[grup])} sunucu) ---")
with ThreadPoolExecutor(max_workers=max_thread) as executor:
gelecekler = {
executor.submit(paket_guncelle, sunucu, kullanici, anahtar_yolu, kuru_calistir): sunucu
for sunucu in gruplar[grup]
}
for gelecek in as_completed(gelecekler):
sonuc = gelecek.result()
tum_sonuclar.append(sonuc)
# Bir sonraki gruba geçmeden onay iste (kuru çalıştırma değilse)
if not kuru_calistir and grup != "production":
onay = input(f"n{grup} tamamlandı. Devam edilsin mi? (e/h): ")
if onay.lower() != "e":
print("İşlem durduruldu.")
break
return tum_sonuclar
Hata Yönetimi ve Yeniden Deneme Mekanizması
Ağ üzerinde çalışırken bağlantı hataları kaçınılmaz. Akıllı bir retry mekanizması olmadan scriptiniz güvenilmez hale gelir.
import time
import paramiko
from functools import wraps
def yeniden_dene(max_deneme=3, bekleme_suresi=2, katlayici=True):
"""
Decorator: Başarısız olan fonksiyonu otomatik yeniden dener
katlayici=True ise her denemede bekleme süresini ikiye katlar
"""
def dekorator(fonksiyon):
@wraps(fonksiyon)
def sarici(*args, **kwargs):
bekleme = bekleme_suresi
son_hata = None
for deneme in range(1, max_deneme + 1):
try:
return fonksiyon(*args, **kwargs)
except (paramiko.ssh_exception.NoValidConnectionsError,
paramiko.ssh_exception.SSHException,
TimeoutError,
ConnectionResetError) as e:
son_hata = e
if deneme < max_deneme:
print(f"[DENEME {deneme}/{max_deneme}] {args[0] if args else ''}: {str(e)[:60]} - {bekleme}sn bekleniyor")
time.sleep(bekleme)
if katlayici:
bekleme *= 2
continue
except Exception as e:
# Retry yapmaya değmeyecek hatalar (yetki hatası gibi)
raise e
raise Exception(f"{max_deneme} denemede başarısız: {son_hata}")
return sarici
return dekorator
@yeniden_dene(max_deneme=3, bekleme_suresi=2, katlayici=True)
def guvenilir_ssh_komut(host, komut, kullanici, anahtar_yolu):
"""Retry mekanizması olan güvenilir SSH komut çalıştırıcı"""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(host, username=kullanici, key_filename=anahtar_yolu, timeout=10)
_, stdout, stderr = ssh.exec_command(komut, timeout=30)
cikti = stdout.read().decode("utf-8").strip()
hata = stderr.read().decode("utf-8").strip()
ssh.close()
return {"cikti": cikti, "hata": hata}
Asyncio ile Daha İleri Gitmek
Çok yüksek sunucu sayısıyla (500+) çalışıyorsanız, thread tabanlı yaklaşım yerine asyncio ve asyncssh kombinasyonu çok daha verimli.
pip install asyncssh
import asyncio
import asyncssh
import time
async def async_ssh_komut(host, komut, kullanici, anahtar_yolu, semaphore):
"""Asyncio ile tek sunucuya SSH"""
async with semaphore: # Eş zamanlı bağlantı sayısını sınırla
try:
async with asyncssh.connect(
host,
username=kullanici,
client_keys=[anahtar_yolu],
known_hosts=None,
connect_timeout=10
) as baglanti:
sonuc = await baglanti.run(komut, timeout=30)
return {
"host": host,
"cikti": sonuc.stdout.strip(),
"hata": sonuc.stderr.strip(),
"donus_kodu": sonuc.returncode,
"basarili": sonuc.returncode == 0
}
except Exception as e:
return {
"host": host,
"cikti": "",
"hata": str(e),
"basarili": False
}
async def toplu_async_calistir(sunucular, komut, kullanici, anahtar_yolu, max_esanlili=50):
"""Tüm sunucularda asenkron komut çalıştırır"""
# Semaphore ile eş zamanlı bağlantı sınırı
semaphore = asyncio.Semaphore(max_esanlili)
gorevler = [
async_ssh_komut(host, komut, kullanici, anahtar_yolu, semaphore)
for host in sunucular
]
# gather ile tüm görevleri paralel çalıştır
sonuclar = await asyncio.gather(*gorevler, return_exceptions=True)
basarili = sum(1 for s in sonuclar if isinstance(s, dict) and s.get("basarili"))
print(f"Tamamlandı: {basarili}/{len(sunucular)} başarılı")
return sonuclar
# Kullanım
if __name__ == "__main__":
sunucular = [f"10.0.1.{i}" for i in range(1, 101)] # 100 sunucu
basla = time.time()
sonuclar = asyncio.run(
toplu_async_calistir(
sunucular=sunucular,
komut="hostname && uname -r",
kullanici="admin",
anahtar_yolu="/home/admin/.ssh/id_rsa",
max_esanlili=30
)
)
print(f"100 sunucu toplam süre: {time.time() - basla:.2f} saniye")
Dosya Dağıtımı: Çok Sayıda Sunucuya Aynı Anda Dosya Kopyalamak
Konfigürasyon dosyası, script veya binary dağıtmak sysadmin’in günlük işlerinden biri. Bunu da paralel yapalım.
import paramiko
from concurrent.futures import ThreadPoolExecutor, as_completed
import os
def sftp_dosya_gonder(host, yerel_yol, uzak_yol, kullanici, anahtar_yolu):
"""SFTP ile dosya gönderir, gerekirse dizin oluşturur"""
sonuc = {"host": host, "basarili": False, "hata": None}
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(host, username=kullanici, key_filename=anahtar_yolu, timeout=10)
sftp = ssh.open_sftp()
# Uzak dizini oluştur (yoksa)
uzak_dizin = os.path.dirname(uzak_yol)
try:
sftp.stat(uzak_dizin)
except FileNotFoundError:
ssh.exec_command(f"mkdir -p {uzak_dizin}")
import time
time.sleep(0.5)
# Dosyayı gönder
sftp.put(yerel_yol, uzak_yol)
# Dosya boyutunu doğrula
uzak_boyut = sftp.stat(uzak_yol).st_size
yerel_boyut = os.path.getsize(yerel_yol)
if uzak_boyut != yerel_boyut:
raise Exception(f"Boyut uyuşmazlığı: yerel={yerel_boyut}, uzak={uzak_boyut}")
sftp.close()
ssh.close()
sonuc["basarili"] = True
print(f"[GONDERILDI] {host}: {uzak_yol} ({yerel_boyut} byte)")
except Exception as e:
sonuc["hata"] = str(e)
print(f"[HATA] {host}: {str(e)}")
return sonuc
def toplu_dosya_dagit(sunucular, yerel_yol, uzak_yol, kullanici, anahtar_yolu):
"""Dosyayı tüm sunuculara dağıtır"""
if not os.path.exists(yerel_yol):
raise FileNotFoundError(f"Kaynak dosya bulunamadı: {yerel_yol}")
print(f"Dağıtılıyor: {yerel_yol} -> {uzak_yol} ({len(sunucular)} sunucu)")
with ThreadPoolExecutor(max_workers=15) as executor:
gelecekler = {
executor.submit(sftp_dosya_gonder, host, yerel_yol, uzak_yol, kullanici, anahtar_yolu): host
for host in sunucular
}
sonuclar = [gelecek.result() for gelecek in as_completed(gelecekler)]
basarili = sum(1 for s in sonuclar if s["basarili"])
print(f"nDağıtım tamamlandı: {basarili}/{len(sunucular)}")
return sonuclar
Pratik İpuçları ve Dikkat Edilecekler
Paralel SSH işlerinde öğrendiğim bazı kritik noktalar:
- SSH key authentication şart: Şifre ile paralel bağlantı açmaya çalışmak hem güvensiz hem de çok daha yavaş
- Timeout değerlerini küçümseme: Ağ sorunlarında threadlerinizin sonsuza dek beklemesini önlemek için hem
connecthemexec_commandiçin timeout belirleyin - Known hosts yönetimi:
AutoAddPolicy()lab ortamında kullanışlı ama üretimdeRejectPolicyveya önceden güncellenmiş bir known_hosts dosyası kullanın - Thread sayısına dikkat: Hedef sunucularda
MaxSessionsveMaxStartupsSSH parametreleri aynı anda çok fazla bağlantıyı reddedebilir. Çoğu OpenSSH varsayılanı 10 eş zamanlı unauthenticated bağlantıya izin verir - Çıktıları merkezi loglama sistemine yönlendirin:
print()ile ekrana basmak 200 sunucu için çok gürültülü, sonuçları bir dosyaya veya ELK/Loki gibi bir sisteme gönderin - İdempotent scriptler yazın: Aynı scripti iki kez çalıştırmanın zarar vermeyeceğinden emin olun. Bir thread yarıda kalabilir, retry yapıldığında önceki durumu bozmamak kritik
- Rate limiting: Çok agresif paralel işlem, switch’lerde connection flood alarmı tetikleyebilir. Büyük altyapılarda 50+ thread açmadan önce ağ ekibiyle konuşun
Sonuç
Python ile paralel sunucu yönetimi, sysadmin işini hem hızlandırıyor hem de kod olarak belgelemiş oluyor. ThreadPoolExecutor günlük 50-100 sunucu senaryoları için yeterince hızlı ve anlaşılır. Daha büyük ölçeğe çıktığınızda asyncssh + asyncio kombinasyonu bellek ve performans açısından çok daha avantajlı.
Yazıdaki kodları kopyalayıp doğrudan üretimde çalıştırmayın, kendi altyapınıza göre uyarlayın. Özellikle timeout değerleri, thread sayıları ve hata yönetimi ağ yapınıza ve sunucu sayınıza göre ayar gerektirir. Yeni bir script yazmadan önce her zaman birkaç test sunucusunda deneyin, kuru_calistir=True gibi güvenli modlar ekleyin ve ne yaptığını bilmeden bir şeyi 300 sunucuya uygulamayın.
Son olarak: Fabric ve Ansible gibi olgun araçlar zaten bu problemleri çözmüş durumda. Ama Python’ı sıfırdan anlamak, bu araçların içinde ne döndüğünü kavramak ve özel senaryolara adapte etmek için vazgeçilmez. İkisini birbirinin alternatifi değil, tamamlayıcısı olarak görün.