Prod ortamında bir servis sessiz sedasız çökünce ne olur? Kullanıcılar şikayet etmeye başlayana kadar kimse fark etmez. Monitoring yoksa, sorun yokmuş gibi görünür, ta ki iş yerine gelinceye kadar. Python ile basit ama etkili bir HTTP monitoring sistemi kurarak bu “kör nokta” problemini çözebilir, servislerinizin sağlığını sürekli takip edebilirsiniz.
Bu yazıda sıfırdan başlayarak production’da kullanabileceğiniz bir HTTP monitoring aracı geliştireceğiz. Sadece “200 döndü mü?” kontrolü değil, yanıt süresi, içerik doğrulama, SSL sertifika kontrolü ve alert mekanizması da dahil.
Neden Python ile Custom Monitoring?
Nagios, Zabbix, Datadog gibi araçlar var, biliyorum. Ama bunların hepsinin bir öğrenme eğrisi var, lisans maliyeti var ya da sizin özel ihtiyaçlarınıza uymayan bir yapısı var. Basit bir Python scripti ile şunları kazanırsınız:
- Tam kontrol, her şey sizin elinizde
- Özel iş mantığı ekleyebilirsiniz (örneğin JSON response içindeki belirli bir alanı kontrol etmek)
- Kurulum gerektirmez, sadece Python yeterli
- Mevcut alerting sistemlerinize kolayca entegre edilebilir
- Versiyon kontrolü ile yönetilebilir
Küçük ve orta ölçekli altyapılarda “yeterince iyi” olan şey genellikle basit olandır.
Temel HTTP Kontrolü
Her şey basit bir HTTP isteğiyle başlar. requests kütüphanesi Python’un bu işi için standart silahıdır.
pip install requests
İlk versiyonumuz:
import requests
import time
from datetime import datetime
def check_http(url, timeout=10):
"""
Verilen URL'ye HTTP isteği gönderir ve sonucu döndürür.
"""
result = {
'url': url,
'timestamp': datetime.now().isoformat(),
'status_code': None,
'response_time_ms': None,
'is_up': False,
'error': None
}
try:
start_time = time.time()
response = requests.get(url, timeout=timeout, allow_redirects=True)
end_time = time.time()
result['status_code'] = response.status_code
result['response_time_ms'] = round((end_time - start_time) * 1000, 2)
result['is_up'] = response.status_code < 500
except requests.exceptions.ConnectionError as e:
result['error'] = f"Bağlantı hatası: {str(e)}"
except requests.exceptions.Timeout:
result['error'] = f"Zaman aşımı ({timeout}s)"
except requests.exceptions.RequestException as e:
result['error'] = f"İstek hatası: {str(e)}"
return result
# Test edelim
if __name__ == "__main__":
urls = [
"https://httpbin.org/get",
"https://httpbin.org/status/503",
"https://httpbin.org/delay/15", # timeout testi
]
for url in urls:
sonuc = check_http(url, timeout=5)
durum = "YUKARI" if sonuc['is_up'] else "ASAGI"
print(f"[{durum}] {url}")
print(f" Status: {sonuc['status_code']}")
print(f" Yanıt süresi: {sonuc['response_time_ms']}ms")
if sonuc['error']:
print(f" Hata: {sonuc['error']}")
print()
Bu temel yapı işe yarıyor ama production için yetersiz. Şimdi üzerine katman katman özellik ekleyelim.
Konfigürasyon Dosyası ile Servis Tanımları
Kontrol edilecek servisleri hardcode yazmak yerine bir YAML dosyasından okuyalım. Böylece scripti değiştirmeden yeni servis ekleyebilirsiniz.
pip install pyyaml
services.yaml dosyası:
# services.yaml
services:
- name: "Ana Websitesi"
url: "https://example.com"
expected_status: 200
timeout: 10
check_interval: 60
content_check: "Hoşgeldiniz"
- name: "API Health Endpoint"
url: "https://api.example.com/health"
expected_status: 200
timeout: 5
check_interval: 30
content_check: '{"status": "ok"}'
- name: "Admin Paneli"
url: "https://admin.example.com"
expected_status: 200
timeout: 15
check_interval: 120
- name: "Ödeme Servisi"
url: "https://payment.example.com/ping"
expected_status: 200
timeout: 8
check_interval: 30
alert:
email:
enabled: true
smtp_host: "smtp.gmail.com"
smtp_port: 587
from_addr: "[email protected]"
to_addrs:
- "[email protected]"
- "[email protected]"
slack:
enabled: true
webhook_url: "https://hooks.slack.com/services/XXX/YYY/ZZZ"
Konfigürasyonu okuyup servisleri yöneten sınıf:
import yaml
import json
import logging
from dataclasses import dataclass, field
from typing import Optional, List
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@dataclass
class ServiceConfig:
name: str
url: str
expected_status: int = 200
timeout: int = 10
check_interval: int = 60
content_check: Optional[str] = None
headers: dict = field(default_factory=dict)
consecutive_failures: int = 0
is_currently_down: bool = False
def load_config(config_file: str) -> dict:
"""YAML konfigürasyon dosyasını yükler."""
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
logger.info(f"{len(config['services'])} servis konfigürasyonu yüklendi")
return config
except FileNotFoundError:
logger.error(f"Konfigürasyon dosyası bulunamadı: {config_file}")
raise
except yaml.YAMLError as e:
logger.error(f"YAML parse hatası: {e}")
raise
def parse_services(config: dict) -> List[ServiceConfig]:
"""Konfigürasyondan ServiceConfig listesi oluşturur."""
services = []
for svc in config.get('services', []):
services.append(ServiceConfig(
name=svc['name'],
url=svc['url'],
expected_status=svc.get('expected_status', 200),
timeout=svc.get('timeout', 10),
check_interval=svc.get('check_interval', 60),
content_check=svc.get('content_check'),
headers=svc.get('headers', {})
))
return services
Gelişmiş Kontrol Fonksiyonu
Temel HTTP kontrolüne içerik doğrulama, SSL kontrolü ve özel header desteği ekleyelim:
import ssl
import socket
from datetime import datetime, timezone
def check_ssl_expiry(hostname: str, port: int = 443) -> Optional[int]:
"""
SSL sertifikasının kaç gün sonra dolacağını döndürür.
Negatif değer sertifikanın zaten dolduğunu gösterir.
"""
try:
context = ssl.create_default_context()
with socket.create_connection((hostname, port), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
cert = ssock.getpeercert()
expire_date_str = cert['notAfter']
expire_date = datetime.strptime(
expire_date_str,
'%b %d %H:%M:%S %Y %Z'
).replace(tzinfo=timezone.utc)
now = datetime.now(timezone.utc)
days_remaining = (expire_date - now).days
return days_remaining
except Exception as e:
logger.warning(f"SSL kontrolü başarısız ({hostname}): {e}")
return None
def advanced_check(service: ServiceConfig) -> dict:
"""
Gelişmiş servis kontrolü yapar.
HTTP durumu, yanıt süresi, içerik ve SSL kontrolü içerir.
"""
from urllib.parse import urlparse
result = {
'name': service.name,
'url': service.url,
'timestamp': datetime.now().isoformat(),
'status_code': None,
'response_time_ms': None,
'is_up': False,
'content_check_passed': None,
'ssl_days_remaining': None,
'error': None,
'details': []
}
# SSL kontrolü (sadece HTTPS için)
parsed = urlparse(service.url)
if parsed.scheme == 'https':
ssl_days = check_ssl_expiry(parsed.hostname)
result['ssl_days_remaining'] = ssl_days
if ssl_days is not None and ssl_days < 14:
result['details'].append(
f"UYARI: SSL sertifikası {ssl_days} gün sonra doluyor!"
)
# HTTP kontrolü
try:
start_time = time.time()
response = requests.get(
service.url,
timeout=service.timeout,
headers=service.headers,
allow_redirects=True,
verify=True # SSL doğrulamasını atlamayın!
)
end_time = time.time()
result['status_code'] = response.status_code
result['response_time_ms'] = round((end_time - start_time) * 1000, 2)
# Status code kontrolü
if response.status_code == service.expected_status:
result['is_up'] = True
else:
result['details'].append(
f"Beklenen status: {service.expected_status}, "
f"Gelen: {response.status_code}"
)
# İçerik kontrolü
if service.content_check:
if service.content_check in response.text:
result['content_check_passed'] = True
else:
result['content_check_passed'] = False
result['is_up'] = False
result['details'].append(
f"İçerik kontrolü başarısız: '{service.content_check}' bulunamadı"
)
# Yavaş yanıt uyarısı
if result['response_time_ms'] > 3000:
result['details'].append(
f"Yavaş yanıt: {result['response_time_ms']}ms"
)
except requests.exceptions.ConnectionError:
result['error'] = "Bağlantı kurulamadı"
except requests.exceptions.Timeout:
result['error'] = f"Zaman aşımı ({service.timeout}s)"
except requests.exceptions.SSLError as e:
result['error'] = f"SSL hatası: {str(e)}"
except requests.exceptions.RequestException as e:
result['error'] = f"İstek hatası: {str(e)}"
return result
Alert Mekanizması
Sadece kontrol etmek yetmez, bir şeyler bozulduğunda haberdar olmanız gerekir. Email ve Slack entegrasyonu ekleyelim:
import smtplib
import urllib.request
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class AlertManager:
def __init__(self, alert_config: dict):
self.config = alert_config
self.alert_cooldown = {} # aynı servis için tekrar alert göndermemek için
self.cooldown_minutes = 30
def should_alert(self, service_name: str) -> bool:
"""Cooldown kontrolü, aynı servis için sürekli alert göndermez."""
if service_name not in self.alert_cooldown:
return True
last_alert = self.alert_cooldown[service_name]
elapsed = (datetime.now() - last_alert).total_seconds() / 60
return elapsed >= self.cooldown_minutes
def send_email_alert(self, subject: str, body: str):
"""SMTP üzerinden email alert gönderir."""
email_config = self.config.get('email', {})
if not email_config.get('enabled', False):
return
try:
msg = MIMEMultipart('alternative')
msg['Subject'] = subject
msg['From'] = email_config['from_addr']
msg['To'] = ', '.join(email_config['to_addrs'])
html_body = f"""
<html>
<body style="font-family: Arial, sans-serif;">
<h2 style="color: #d32f2f;">Servis Uyarısı</h2>
<pre style="background: #f5f5f5; padding: 15px; border-radius: 4px;">
{body}
</pre>
<p style="color: #666; font-size: 12px;">
Bu mesaj otomatik monitoring sistemi tarafından gönderilmiştir.
</p>
</body>
</html>
"""
msg.attach(MIMEText(body, 'plain'))
msg.attach(MIMEText(html_body, 'html'))
with smtplib.SMTP(email_config['smtp_host'], email_config['smtp_port']) as server:
server.starttls()
server.login(
email_config.get('username', email_config['from_addr']),
email_config['password']
)
server.sendmail(
email_config['from_addr'],
email_config['to_addrs'],
msg.as_string()
)
logger.info(f"Email alert gönderildi: {subject}")
except Exception as e:
logger.error(f"Email gönderilemedi: {e}")
def send_slack_alert(self, message: str, is_recovery: bool = False):
"""Slack webhook üzerinden mesaj gönderir."""
slack_config = self.config.get('slack', {})
if not slack_config.get('enabled', False):
return
emoji = ":white_check_mark:" if is_recovery else ":red_circle:"
color = "#36a64f" if is_recovery else "#d32f2f"
payload = json.dumps({
"attachments": [{
"color": color,
"text": f"{emoji} {message}",
"footer": "HTTP Monitor",
"ts": int(time.time())
}]
}).encode('utf-8')
try:
req = urllib.request.Request(
slack_config['webhook_url'],
data=payload,
headers={'Content-Type': 'application/json'}
)
urllib.request.urlopen(req, timeout=5)
logger.info("Slack alert gönderildi")
except Exception as e:
logger.error(f"Slack alert gönderilemedi: {e}")
def trigger_alert(self, service: ServiceConfig, check_result: dict):
"""Alert koşulunu değerlendirir ve gerekirse alert gönderir."""
if not self.should_alert(service.name):
return
subject = f"ALERT: {service.name} servisinde sorun!"
body = (
f"Servis: {service.name}n"
f"URL: {service.url}n"
f"Zaman: {check_result['timestamp']}n"
f"Status Kodu: {check_result.get('status_code', 'N/A')}n"
f"Yanıt Süresi: {check_result.get('response_time_ms', 'N/A')}msn"
f"Hata: {check_result.get('error', 'Yok')}n"
)
if check_result.get('details'):
body += "nDetaylar:n"
for detail in check_result['details']:
body += f" - {detail}n"
self.send_email_alert(subject, body)
self.send_slack_alert(f"*{service.name}* servisi çöktü! URL: {service.url}")
self.alert_cooldown[service.name] = datetime.now()
def trigger_recovery(self, service: ServiceConfig):
"""Servis kurtarıldığında recovery alert gönderir."""
message = f"*{service.name}* servisi tekrar ayağa kalktı!"
self.send_slack_alert(message, is_recovery=True)
if service.name in self.alert_cooldown:
del self.alert_cooldown[service.name]
Sonuçları Kaydetme ve Raporlama
Kontrol sonuçlarını bir JSON dosyasına kaydedelim. Basit ama işlevsel:
import os
from pathlib import Path
class ResultStore:
def __init__(self, data_dir: str = "/var/log/http-monitor"):
self.data_dir = Path(data_dir)
self.data_dir.mkdir(parents=True, exist_ok=True)
self.results_file = self.data_dir / "results.json"
self.results = self._load_existing()
def _load_existing(self) -> dict:
if self.results_file.exists():
try:
with open(self.results_file, 'r') as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return {}
return {}
def save_result(self, service_name: str, result: dict):
"""Sonucu kaydeder, her servis için son 100 sonucu tutar."""
if service_name not in self.results:
self.results[service_name] = []
self.results[service_name].append(result)
# Son 100 sonucu tut
if len(self.results[service_name]) > 100:
self.results[service_name] = self.results[service_name][-100:]
with open(self.results_file, 'w') as f:
json.dump(self.results, f, indent=2, ensure_ascii=False)
def get_uptime_percentage(self, service_name: str, last_n: int = 20) -> float:
"""Son N kontrol için uptime yüzdesini hesaplar."""
if service_name not in self.results:
return 0.0
checks = self.results[service_name][-last_n:]
if not checks:
return 0.0
up_count = sum(1 for c in checks if c.get('is_up', False))
return round((up_count / len(checks)) * 100, 2)
def generate_report(self) -> str:
"""Tüm servisler için özet rapor oluşturur."""
report = ["=" * 50]
report.append("HTTP MONİTORİNG RAPORU")
report.append(f"Zaman: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append("=" * 50)
for service_name, checks in self.results.items():
if not checks:
continue
last_check = checks[-1]
uptime = self.get_uptime_percentage(service_name)
avg_response = None
response_times = [
c['response_time_ms'] for c in checks[-20:]
if c.get('response_time_ms')
]
if response_times:
avg_response = round(sum(response_times) / len(response_times), 2)
durum = "YUKARI" if last_check.get('is_up') else "ASAGI"
report.append(f"n{service_name}")
report.append(f" Durum: {durum}")
report.append(f" Uptime (son 20 kontrol): %{uptime}")
if avg_response:
report.append(f" Ort. Yanıt Süresi: {avg_response}ms")
report.append(f" Son Kontrol: {last_check.get('timestamp', 'N/A')}")
return "n".join(report)
Ana Monitoring Döngüsü
Tüm parçaları bir araya getirelim:
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
class HTTPMonitor:
def __init__(self, config_file: str = "services.yaml"):
config = load_config(config_file)
self.services = parse_services(config)
self.alert_manager = AlertManager(config.get('alert', {}))
self.store = ResultStore()
self.running = False
self._check_timers = {}
def check_and_process(self, service: ServiceConfig):
"""Tek bir servis için kontrol yapar ve sonucu işler."""
result = advanced_check(service)
self.store.save_result(service.name, result)
durum = "OK" if result['is_up'] else "FAIL"
ms = result.get('response_time_ms', 'N/A')
logger.info(f"[{durum}] {service.name} - {ms}ms - {service.url}")
if result.get('details'):
for detail in result['details']:
logger.warning(f" {service.name}: {detail}")
# Alert mantığı
if not result['is_up']:
service.consecutive_failures += 1
# 2 ardışık başarısızlıktan sonra alert gönder (false positive'i azaltır)
if service.consecutive_failures >= 2 and not service.is_currently_down:
service.is_currently_down = True
self.alert_manager.trigger_alert(service, result)
logger.error(f"ALERT: {service.name} servisi çöktü!")
else:
if service.is_currently_down:
service.is_currently_down = False
self.alert_manager.trigger_recovery(service)
logger.info(f"RECOVERY: {service.name} servisi kurtarıldı!")
service.consecutive_failures = 0
return result
def run_once(self):
"""Tüm servisleri bir kez paralel olarak kontrol eder."""
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {
executor.submit(self.check_and_process, svc): svc
for svc in self.services
}
for future in as_completed(futures):
svc = futures[future]
try:
future.result()
except Exception as e:
logger.error(f"{svc.name} kontrolünde beklenmeyen hata: {e}")
def run_continuous(self):
"""Servisleri kendi interval'larında sürekli kontrol eder."""
self.running = True
logger.info(f"Monitoring başlatıldı. {len(self.services)} servis takip ediliyor.")
def check_service_loop(service: ServiceConfig):
while self.running:
self.check_and_process(service)
time.sleep(service.check_interval)
threads = []
for service in self.services:
t = threading.Thread(
target=check_service_loop,
args=(service,),
name=f"monitor-{service.name}",
daemon=True
)
t.start()
threads.append(t)
time.sleep(0.5) # Tüm servislerin aynı anda başlamasını önle
try:
while self.running:
time.sleep(300) # Her 5 dakikada rapor yazdır
print(self.store.generate_report())
except KeyboardInterrupt:
logger.info("Monitoring durduruluyor...")
self.running = False
if __name__ == "__main__":
import sys
config_file = sys.argv[1] if len(sys.argv) > 1 else "services.yaml"
monitor = HTTPMonitor(config_file)
if "--once" in sys.argv:
monitor.run_once()
print(monitor.store.generate_report())
else:
monitor.run_continuous()
Systemd ile Servis Olarak Çalıştırma
Scripti production’da sürekli çalışacak şekilde systemd servisine dönüştürelim:
# /etc/systemd/system/http-monitor.service
[Unit]
Description=HTTP Monitoring Service
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
User=monitoring
Group=monitoring
WorkingDirectory=/opt/http-monitor
ExecStart=/opt/http-monitor/venv/bin/python monitor.py /opt/http-monitor/services.yaml
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
SyslogIdentifier=http-monitor
# Güvenlik kısıtlamaları
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=strict
ReadWritePaths=/var/log/http-monitor
[Install]
WantedBy=multi-user.target
Servisi etkinleştirip başlatmak için:
# Kullanıcı oluştur
useradd -r -s /bin/false -m -d /opt/http-monitor monitoring
# Dizin ve izinleri ayarla
mkdir -p /opt/http-monitor
cp monitor.py services.yaml /opt/http-monitor/
chown -R monitoring:monitoring /opt/http-monitor
# Virtual environment oluştur
sudo -u monitoring python3 -m venv /opt/http-monitor/venv
sudo -u monitoring /opt/http-monitor/venv/bin/pip install requests pyyaml
# Log dizini
mkdir -p /var/log/http-monitor
chown monitoring:monitoring /var/log/http-monitor
# Servisi başlat
systemctl daemon-reload
systemctl enable http-monitor
systemctl start http-monitor
# Durumu kontrol et
systemctl status http-monitor
journalctl -u http-monitor -f
Gerçek Dünya Senaryoları
Senaryo 1: API endpoint JSON kontrolü
Bir e-ticaret sitesinin ödeme API’si “200 OK” dönüyor ama içeride bir hata var. content_check alanına "success": true yazarsanız bu durumu yakalarsınız. Sadece status kod kontrolü bu tür “sessiz başarısızlıkları” atlar.
Senaryo 2: SSL sertifika yenileme hatırlatması
Let’s Encrypt sertifikalarının 90 günlük ömrü var ve auto-renewal bazen sessizce başarısız olabiliyor. Bu scriptin SSL kontrolü 14 gün öncesinden sizi uyaracak ve prod’da sertifika dolduğu için site kapanmadan önce müdahale etme imkanı verecek.
Senaryo 3: Ardışık başarısızlık mantığı
consecutive_failures >= 2 koşulu önemli. Tek bir başarısız istek network titremesi olabilir. İki ardışık başarısızlık gerçek bir sorunu işaret eder. Bu şekilde false positive alert oranını önemli ölçüde düşürürsünüz.
Senaryo 4: Farklı interval’lar
Ödeme servisi gibi kritik endpointler 30 saniyede bir, admin paneli gibi daha az kritik olanlar 2 dakikada bir kontrol edilebilir. Her servisin kendi check_interval değeri olması bu esnekliği sağlar.
Cron ile Basit Alternatif
Sürekli çalışan bir daemon istemiyorsanız, cron ile --once modunu kullanabilirsiniz:
# Crontab - her 5 dakikada bir kontrol
*/5 * * * * /opt/http-monitor/venv/bin/python /opt/http-monitor/monitor.py /opt/http-monitor/services.yaml --once >> /var/log/http-monitor/cron.log 2>&1
# Her sabah 08:00'de rapor emaili gönder
0 8 * * * /opt/http-monitor/venv/bin/python /opt/http-monitor/report.py | mail -s "Günlük Monitoring Raporu" [email protected]
Sonuç
Bu yazıda sıfırdan bir HTTP monitoring sistemi inşa ettik. Temel HTTP kontrolünden başlayıp YAML konfigürasyonu, SSL sertifika takibi, içerik doğrulama, akıllı alert mekanizması ve systemd entegrasyonuna kadar production’a hazır bir araç ortaya çıkardık.
Tabii ki bu scripti ihtiyaçlarınıza göre genişletebilirsiniz. POST istekleri için destek, authentication header’ları, metrikleri Prometheus’a push etme, ya da sonuçları bir SQLite veritabanına kaydetme bunların hepsi eklenebilir özellikler.
Önemli olan nokta şu: Nagios kurmak için 2 gün harcamak yerine, bu kadar Python bilerek kendi ihtiyaçlarınıza tam uyan bir araç 2 saatte yazabilirsiniz. Büyük araçlar büyük sorunlar içindir. Küçük altyapılar için “yeterince iyi” olan şey genellikle en iyi çözümdür.
Scripti clone’layıp kendi servislerinize adapte edin, services.yaml dosyasını düzenleyin ve monitoring’e başlayın. Bir dahaki servis çöküşünü kullanıcılarınızdan değil, kendi sisteminizden öğrenin.