Günlük yüzlerce gigabyte log üreten sistemlerde bir şeylerin yanlış gittiğini anlamak, samanlıkta iğne aramaktan farksız. Saldırı girişimleri, uygulama hataları, performans sorunları… hepsi log dosyalarının derinliklerinde gömülü bekliyor. Elle okumak imkansız, grep ve awk tek başına yetmiyor, SIEM lisansı da her yerde yok. İşte tam bu noktada Python devreye giriyor ve log analizini hem güçlü hem de tamamen özelleştirilebilir hale getiriyor.
Neden Python ile Log Analizi?
grep ve awk muhteşem araçlar, onları küçümsemiyorum. Ama iş birden fazla log kaynağını ilişkilendirmeye, pattern detection yapmaya veya sonuçları bir rapora dönüştürmeye geldiğinde yetersiz kalıyorlar. Python’un bu işte avantajları şunlar:
- Regex desteği güçlü ve okunabilir
- Birden fazla log formatını aynı script içinde işleyebilirsiniz
- Veriyi yapılandırıp veritabanına veya JSON’a yazabilirsiniz
- Koşullu analizler yapabilirsiniz (örneğin aynı IP’den 5 dakikada 100 istek)
- Alarm mekanizmaları entegre edebilirsiniz
- Sonuçları e-posta, Slack veya Telegram’a gönderebilirsiniz
Haydi pratik senaryolarla başlayalım.
Temel Log Ayrıştırma: Nginx Access Log
En yaygın senaryo: Nginx access log analizi. Bu loglar şu formatta gelir:
192.168.1.105 - frank [10/Oct/2023:13:55:36 -0700] "GET /index.html HTTP/1.1" 200 2326
İlk scriptimizde bu formatı ayrıştırıp anlamlı verilere dönüştüreceğiz:
cat << 'EOF' > parse_nginx.py
import re
import sys
from collections import Counter, defaultdict
from datetime import datetime
# Nginx combined log format için regex
LOG_PATTERN = re.compile(
r'(?P<ip>d+.d+.d+.d+)s+'
r'(?P<ident>-|w+)s+'
r'(?P<user>-|w+)s+'
r'[(?P<time>[^]]+)]s+'
r'"(?P<method>w+)s+(?P<path>[^s]+)s+(?P<protocol>[^"]+)"s+'
r'(?P<status>d+)s+'
r'(?P<size>d+|-)'
)
def parse_log_file(filepath):
entries = []
errors = 0
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
for line_num, line in enumerate(f, 1):
match = LOG_PATTERN.match(line.strip())
if match:
entry = match.groupdict()
entry['status'] = int(entry['status'])
entry['size'] = int(entry['size']) if entry['size'] != '-' else 0
entries.append(entry)
else:
errors += 1
print(f"Toplam {len(entries)} kayit islendi, {errors} satir parse edilemedi.")
return entries
def analyze_logs(entries):
ip_counter = Counter(e['ip'] for e in entries)
status_counter = Counter(e['status'] for e in entries)
path_counter = Counter(e['path'] for e in entries)
print("n=== TOP 10 IP ADRESI ===")
for ip, count in ip_counter.most_common(10):
print(f" {ip}: {count} istek")
print("n=== HTTP DURUM KODLARI ===")
for status, count in sorted(status_counter.items()):
print(f" {status}: {count} istek")
print("n=== EN COK ISTENEN 10 URL ===")
for path, count in path_counter.most_common(10):
print(f" {path}: {count} istek")
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Kullanim: python parse_nginx.py /var/log/nginx/access.log")
sys.exit(1)
entries = parse_log_file(sys.argv[1])
analyze_logs(entries)
EOF
python3 parse_nginx.py /var/log/nginx/access.log
Gerçek Dünya Senaryosu 1: Brute Force Tespiti
Bir sabah sunucuya bağlandınız ve auth.log’da tuhaf bir aktivite var. SSH brute force saldırısı mı? Python ile bunu otomatik olarak tespit eden bir script yazalım:
cat << 'EOF' > detect_bruteforce.py
import re
from collections import defaultdict
from datetime import datetime, timedelta
# Eşik değerleri
THRESHOLD_COUNT = 10 # Kaç başarısız denemeden sonra alarm
THRESHOLD_WINDOW = 300 # Kaç saniyelik pencerede (5 dakika)
AUTH_LOG = '/var/log/auth.log'
# Başarısız SSH girişimi için pattern
FAILED_PATTERN = re.compile(
r'(?P<month>w+)s+(?P<day>d+)s+(?P<time>d+:d+:d+).*'
r'Failed password for (?:invalid user )?(?P<user>S+) from '
r'(?P<ip>d+.d+.d+.d+)'
)
# Başarılı giriş pattern
SUCCESS_PATTERN = re.compile(
r'(?P<month>w+)s+(?P<day>d+)s+(?P<time>d+:d+:d+).*'
r'Accepted (?:password|publickey) for (?P<user>S+) from '
r'(?P<ip>d+.d+.d+.d+)'
)
def parse_auth_log(filepath):
failed_attempts = defaultdict(list)
successful_logins = []
current_year = datetime.now().year
with open(filepath, 'r', errors='ignore') as f:
for line in f:
failed_match = FAILED_PATTERN.search(line)
if failed_match:
d = failed_match.groupdict()
try:
timestamp_str = f"{current_year} {d['month']} {d['day']} {d['time']}"
timestamp = datetime.strptime(timestamp_str, "%Y %b %d %H:%M:%S")
failed_attempts[d['ip']].append({
'timestamp': timestamp,
'user': d['user']
})
except ValueError:
continue
success_match = SUCCESS_PATTERN.search(line)
if success_match:
successful_logins.append(success_match.groupdict())
return failed_attempts, successful_logins
def detect_bruteforce(failed_attempts):
attackers = {}
for ip, attempts in failed_attempts.items():
attempts.sort(key=lambda x: x['timestamp'])
# Sliding window analizi
for i, attempt in enumerate(attempts):
window_start = attempt['timestamp']
window_end = window_start + timedelta(seconds=THRESHOLD_WINDOW)
window_attempts = [
a for a in attempts[i:]
if a['timestamp'] <= window_end
]
if len(window_attempts) >= THRESHOLD_COUNT:
targeted_users = list(set(a['user'] for a in window_attempts))
attackers[ip] = {
'count': len(attempts),
'window_count': len(window_attempts),
'targeted_users': targeted_users,
'first_attempt': attempts[0]['timestamp'],
'last_attempt': attempts[-1]['timestamp']
}
break
return attackers
def generate_report(attackers, successful_logins):
if not attackers:
print("[OK] Brute force saldirisi tespit edilmedi.")
return
print(f"n[ALARM] {len(attackers)} potansiyel brute force kaynagi tespit edildi!n")
for ip, data in sorted(attackers.items(), key=lambda x: x[1]['count'], reverse=True):
print(f"IP: {ip}")
print(f" Toplam basarisiz deneme: {data['count']}")
print(f" {THRESHOLD_WINDOW}sn pencerede: {data['window_count']} deneme")
print(f" Hedef kullanicilar: {', '.join(data['targeted_users'][:5])}")
print(f" Ilk deneme: {data['first_attempt']}")
print(f" Son deneme: {data['last_attempt']}")
# Bu IP başarılı giriş yapmış mı?
successful_ips = [s['ip'] for s in successful_logins]
if ip in successful_ips:
print(f" [KRITIK] Bu IP BASARILI GIRIS YAPMIS!")
print()
if __name__ == "__main__":
print("Auth log analiz ediliyor...")
failed, successful = parse_auth_log(AUTH_LOG)
print(f"Toplam {sum(len(v) for v in failed.values())} basarisiz giris, "
f"{len(successful)} basarili giris bulundu.")
attackers = detect_bruteforce(failed)
generate_report(attackers, successful)
EOF
python3 detect_bruteforce.py
Bu script çıktısında [KRITIK] görürseniz, o IP’yi hemen iptables’a alın.
Log Rotasyonu ile Başa Çıkmak
Gerçek hayatta loglar rotate edilir: access.log, access.log.1, access.log.2.gz gibi. Tüm bu dosyaları şeffaf şekilde okuyabilen bir utility fonksiyon yazalım:
cat << 'EOF' > log_reader.py
import gzip
import glob
import os
from pathlib import Path
def iter_log_files(base_path, max_rotations=7):
"""
Log dosyalarını kronolojik sırayla okur.
Sıkıştırılmış dosyaları da destekler.
"""
files_to_read = []
base = Path(base_path)
base_dir = base.parent
base_name = base.name
# Ana log dosyası
if base.exists():
files_to_read.append((base.stat().st_mtime, str(base), False))
# Sayısal rotation: .1, .2, .3 ...
for i in range(1, max_rotations + 1):
rotated = base_dir / f"{base_name}.{i}"
rotated_gz = base_dir / f"{base_name}.{i}.gz"
if rotated.exists():
files_to_read.append((rotated.stat().st_mtime, str(rotated), False))
elif rotated_gz.exists():
files_to_read.append((rotated_gz.stat().st_mtime, str(rotated_gz), True))
# Tarih bazlı rotation: access.log-20231015
pattern = str(base_dir / f"{base_name}-*")
for filepath in glob.glob(pattern):
is_gz = filepath.endswith('.gz')
files_to_read.append((os.path.getmtime(filepath), filepath, is_gz))
# Eskiden yeniye sırala
files_to_read.sort(key=lambda x: x[0])
for mtime, filepath, is_compressed in files_to_read:
print(f" Okunuyor: {filepath}")
try:
if is_compressed:
with gzip.open(filepath, 'rt', encoding='utf-8', errors='ignore') as f:
yield from f
else:
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
yield from f
except (IOError, OSError) as e:
print(f" HATA: {filepath} okunamadi: {e}")
# Kullanim ornegi
if __name__ == "__main__":
line_count = 0
for line in iter_log_files('/var/log/nginx/access.log'):
line_count += 1
print(f"Toplam {line_count} satir okundu.")
EOF
python3 log_reader.py
Gerçek Dünya Senaryosu 2: Uygulama Hata Takibi
Java, Python veya Node.js uygulamalarının ürettiği stack trace içeren logları analiz edelim. Bu tip loglar çok satırlı olduğundan işlemesi daha karmaşık:
cat << 'EOF' > parse_app_errors.py
import re
from collections import defaultdict, Counter
from dataclasses import dataclass, field
from typing import List, Optional
import json
@dataclass
class LogEntry:
timestamp: str
level: str
message: str
stack_trace: List[str] = field(default_factory=list)
logger: Optional[str] = None
# Örnek: 2023-10-15 14:23:45,123 ERROR [com.example.Service] - Connection failed
LOG_LINE_PATTERN = re.compile(
r'(?P<timestamp>d{4}-d{2}-d{2}s+d{2}:d{2}:d{2}[,.]d{3})s+'
r'(?P<level>DEBUG|INFO|WARN|WARNING|ERROR|CRITICAL|FATAL)s+'
r'(?:[(?P<logger>[^]]+)]s+)?'
r'(?:[-:]s+)?(?P<message>.+)'
)
STACK_TRACE_PATTERN = re.compile(r'^s+(ats+|Files+|Caused by:|Exception in)')
def parse_multiline_log(filepath):
entries = []
current_entry = None
with open(filepath, 'r', errors='ignore') as f:
for line in f:
line_stripped = line.rstrip()
# Yeni log satırı mı?
match = LOG_LINE_PATTERN.match(line_stripped)
if match:
if current_entry:
entries.append(current_entry)
d = match.groupdict()
current_entry = LogEntry(
timestamp=d['timestamp'],
level=d['level'],
message=d['message'],
logger=d.get('logger')
)
elif current_entry and (STACK_TRACE_PATTERN.match(line_stripped) or
line_stripped.startswith('t')):
# Stack trace satırı
current_entry.stack_trace.append(line_stripped.strip())
if current_entry:
entries.append(current_entry)
return entries
def analyze_errors(entries):
error_entries = [e for e in entries if e.level in ('ERROR', 'CRITICAL', 'FATAL')]
# Hata mesajlarını grupla (sayısal kısımları normalize et)
def normalize_message(msg):
msg = re.sub(r'bd+b', 'N', msg)
msg = re.sub(r'[0-9a-f]{8}-[0-9a-f-]{27}', 'UUID', msg)
return msg[:80]
error_groups = Counter(normalize_message(e.message) for e in error_entries)
print(f"Toplam log kaydi: {len(entries)}")
print(f"ERROR/CRITICAL/FATAL: {len(error_entries)}")
print(f"n=== EN SIK TEKRARLANAN HATALAR ===")
for msg, count in error_groups.most_common(10):
print(f"n [{count}x] {msg}")
# Bu hata tipine ait ilk stack trace'i göster
for entry in error_entries:
if normalize_message(entry.message) == msg and entry.stack_trace:
print(" Stack trace (ilk 3 satir):")
for line in entry.stack_trace[:3]:
print(f" {line}")
break
# Zaman bazlı analiz: saate göre hata dağılımı
hour_counter = Counter()
for entry in error_entries:
try:
hour = entry.timestamp[11:13]
hour_counter[hour] += 1
except IndexError:
pass
print(f"n=== SAATLIK HATA DAGILIMI ===")
for hour in sorted(hour_counter.keys()):
bar = '#' * (hour_counter[hour] // max(1, max(hour_counter.values()) // 30))
print(f" {hour}:00 | {bar} ({hour_counter[hour]})")
if __name__ == "__main__":
import sys
log_file = sys.argv[1] if len(sys.argv) > 1 else '/var/log/myapp/application.log'
print(f"Analiz ediliyor: {log_file}")
entries = parse_multiline_log(log_file)
analyze_errors(entries)
EOF
python3 parse_app_errors.py /var/log/myapp/application.log
Performans: Büyük Log Dosyalarını Verimli İşlemek
100GB log dosyasını parse etmeye çalışıyorsanız, tüm veriyi RAM’e yüklemek felakete davet çıkarmaktır. Generator tabanlı yaklaşım şart:
cat << 'EOF' > efficient_parser.py
import re
import mmap
from typing import Generator, Dict
import time
# Generator tabanlı parser - RAM dostu
def parse_nginx_generator(filepath: str) -> Generator[Dict, None, None]:
pattern = re.compile(
rb'(d+.d+.d+.d+).+[([^]]+)] '
rb'"(w+) ([^s]+) [^"]+" (d+) (d+|-)'
)
parsed = 0
with open(filepath, 'rb') as f:
# mmap ile memory-mapped okuma
with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
for line in iter(mm.readline, b''):
match = pattern.search(line)
if match:
parsed += 1
yield {
'ip': match.group(1).decode(),
'time': match.group(2).decode(),
'method': match.group(3).decode(),
'path': match.group(4).decode(),
'status': int(match.group(5)),
'size': int(match.group(6)) if match.group(6) != b'-' else 0
}
print(f"Toplam {parsed} kayit islendi.")
def streaming_analysis(filepath: str):
"""Tek geçişte birden fazla metrik hesapla"""
from collections import Counter, defaultdict
ip_count = Counter()
status_count = Counter()
total_bytes = 0
error_ips = defaultdict(int)
start_time = time.time()
record_count = 0
for entry in parse_nginx_generator(filepath):
record_count += 1
ip_count[entry['ip']] += 1
status_count[entry['status']] += 1
total_bytes += entry['size']
if entry['status'] >= 400:
error_ips[entry['ip']] += 1
# İlerleme göster
if record_count % 500000 == 0:
elapsed = time.time() - start_time
print(f" {record_count:,} kayit islendi ({record_count/elapsed:.0f} kayit/sn)")
elapsed = time.time() - start_time
print(f"nToplam sure: {elapsed:.2f}sn, Hiz: {record_count/elapsed:.0f} kayit/sn")
print(f"Toplam veri: {total_bytes / (1024**3):.2f} GB")
print(f"nTop 5 IP: {ip_count.most_common(5)}")
print(f"Top 5 Hata Ureten IP: {sorted(error_ips.items(), key=lambda x: x[1], reverse=True)[:5]}")
print(f"Status dagilimi: {dict(sorted(status_count.items()))}")
if __name__ == "__main__":
import sys
streaming_analysis(sys.argv[1] if len(sys.argv) > 1 else '/var/log/nginx/access.log')
EOF
python3 efficient_parser.py /var/log/nginx/access.log
Gerçek Dünya Senaryosu 3: Otomatik Rapor ve Alarm
Sabah 9’da posta kutunuzda günlük log analiz raporu hazır olsun. Cron ile çalışacak, hem rapor üreten hem de kritik durumda e-posta gönderen bir script:
cat << 'EOF' > daily_log_report.py
#!/usr/bin/env python3
"""
Gunluk log analiz raporu
Cron: 0 8 * * * /usr/bin/python3 /opt/scripts/daily_log_report.py
"""
import re
import smtplib
import json
import os
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from collections import Counter, defaultdict
from datetime import datetime, timedelta
# Konfigürasyon
CONFIG = {
'nginx_log': '/var/log/nginx/access.log',
'auth_log': '/var/log/auth.log',
'report_dir': '/var/reports/logs',
'smtp_host': 'localhost',
'smtp_port': 25,
'from_email': '[email protected]',
'to_emails': ['[email protected]'],
'hostname': os.uname().nodename,
'brute_force_threshold': 20,
'error_rate_threshold': 0.05 # %5 üzeri hata oranı alarm
}
def analyze_nginx_for_report():
results = {
'total_requests': 0,
'unique_ips': set(),
'status_codes': Counter(),
'top_ips': Counter(),
'top_paths': Counter(),
'bytes_served': 0
}
pattern = re.compile(
r'(d+.d+.d+.d+).+[([^]]+)] '
r'"(w+) ([^s]+) [^"]+" (d+) (d+|-)'
)
# Sadece bugünün loglarını al
today = datetime.now().strftime('%d/%b/%Y')
try:
with open(CONFIG['nginx_log'], 'r', errors='ignore') as f:
for line in f:
if today not in line:
continue
match = pattern.search(line)
if not match:
continue
ip, time_str, method, path, status, size = match.groups()
results['total_requests'] += 1
results['unique_ips'].add(ip)
results['status_codes'][int(status)] += 1
results['top_ips'][ip] += 1
results['top_paths'][path] += 1
results['bytes_served'] += int(size) if size != '-' else 0
except FileNotFoundError:
pass
results['unique_ips'] = len(results['unique_ips'])
return results
def check_auth_anomalies():
failed_by_ip = Counter()
today = datetime.now().strftime('%b %d').replace(' 0', ' ')
pattern = re.compile(
r'Failed password for.*from (d+.d+.d+.d+)'
)
try:
with open(CONFIG['auth_log'], 'r', errors='ignore') as f:
for line in f:
if not (line.startswith(today) or
line.startswith(datetime.now().strftime('%b %d'))):
continue
match = pattern.search(line)
if match:
failed_by_ip[match.group(1)] += 1
except FileNotFoundError:
pass
suspicious = {ip: count for ip, count in failed_by_ip.items()
if count >= CONFIG['brute_force_threshold']}
return failed_by_ip, suspicious
def build_report(nginx_data, auth_failed, suspicious_ips):
error_count = sum(v for k, v in nginx_data['status_codes'].items() if k >= 400)
total = nginx_data['total_requests'] or 1
error_rate = error_count / total
is_critical = bool(suspicious_ips) or error_rate > CONFIG['error_rate_threshold']
report_lines = [
f"SUNUCU: {CONFIG['hostname']}",
f"TARIH: {datetime.now().strftime('%Y-%m-%d %H:%M')}",
f"DURUM: {'[ALARM] DIKKAT GEREKTIREN DURUMLAR VAR' if is_critical else '[OK] Normal'}",
"",
"=== NGINX OZETI ===",
f"Toplam Istek: {nginx_data['total_requests']:,}",
f"Benzersiz IP: {nginx_data['unique_ips']:,}",
f"Sunulan Veri: {nginx_data['bytes_served'] / (1024**2):.1f} MB",
f"Hata Orani: {error_rate:.1%}",
"",
"HTTP Durum Kodlari:",
]
for code in sorted(nginx_data['status_codes'].keys()):
report_lines.append(f" {code}: {nginx_data['status_codes'][code]:,}")
report_lines += [
"",
"Top 5 IP:",
]
for ip, count in nginx_data['top_ips'].most_common(5):
report_lines.append(f" {ip}: {count:,} istek")
report_lines += [
"",
"=== GUVENLIK OZETI ===",
f"Basarisiz SSH denemesi: {sum(auth_failed.values())}",
f"Suphe cekici IP sayisi: {len(suspicious_ips)}",
]
if suspicious_ips:
report_lines.append("n[ALARM] BRUTE FORCE SUPHELISI IPLER:")
for ip, count in sorted(suspicious_ips.items(), key=lambda x: x[1], reverse=True):
report_lines.append(f" {ip}: {count} basarisiz deneme")
report_lines.append(f" Onerilen komut: iptables -I INPUT -s {ip} -j DROP")
return "n".join(report_lines), is_critical
def send_email(subject, body):
msg = MIMEMultipart()
msg['From'] = CONFIG['from_email']
msg['To'] = ', '.join(CONFIG['to_emails'])
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain', 'utf-8'))
try:
with smtplib.SMTP(CONFIG['smtp_host'], CONFIG['smtp_port']) as server:
server.sendmail(CONFIG['from_email'], CONFIG['to_emails'], msg.as_string())
print("E-posta gonderildi.")
except Exception as e:
print(f"E-posta gonderilemedi: {e}")
def save_report(report_text):
os.makedirs(CONFIG['report_dir'], exist_ok=True)
filename = f"report_{datetime.now().strftime('%Y%m%d_%H%M')}.txt"
filepath = os.path.join(CONFIG['report_dir'], filename)
with open(filepath, 'w') as f:
f.write(report_text)
print(f"Rapor kaydedildi: {filepath}")
if __name__ == "__main__":
print("Log analizi basliyor...")
nginx_data = analyze_nginx_for_report()
auth_failed, suspicious_ips = check_auth_anomalies()
report_text, is_critical = build_report(nginx_data, auth_failed, suspicious_ips)
print(report_text)
save_report(report_text)
subject_prefix = "[ALARM]" if is_critical else "[OK]"
subject = f"{subject_prefix} {CONFIG['hostname']} Gunluk Log Raporu - {datetime.now().strftime('%Y-%m-%d')}"
send_email(subject, report_text)
EOF
chmod +x daily_log_report.py
# Cron'a ekle:
# echo "0 8 * * * /usr/bin/python3 /opt/scripts/daily_log_report.py >> /var/log/log_report.log 2>&1" | crontab -
python3 daily_log_report.py
GeoIP ile Coğrafi Analiz
Hangi ülkelerden istek geliyor? Brute force hangi ülkeden? Bunun için geoip2 kütüphanesini kullanalım:
# Önce kütüphaneyi ve MaxMind veritabanını kuralım
pip3 install geoip2
# MaxMind ücretsiz GeoLite2 veritabanını indirin: https://dev.maxmind.com/geoip/geolite2-free-geolocation-data
cat << 'EOF' > geo_analysis.py
import re
import geoip2.database
from collections import Counter
def analyze_geo(log_file, geoip_db='/usr/share/GeoIP/GeoLite2-Country.mmdb'):
ip_pattern = re.compile(r'^(d+.d+.d+.d+)')
ip_counter = Counter()
with open(log_file, 'r', errors='ignore') as f:
for line in f:
match = ip_pattern.match(line)
if match:
ip_counter[match.group(1)] += 1
country_stats = Counter()
unknown_ips = 0
try:
with geoip2.database.Reader(geoip_db) as reader:
for ip, count in ip_counter.items():
try:
response = reader.country(ip)
country = response.country.name or 'Bilinmeyen'
country_stats[country] += count
except Exception:
unknown_ips += count
except FileNotFoundError:
print(f"GeoIP veritabani bulunamadi: {geoip_db}")
print("MaxMind GeoLite2 veritabanini indirip yolu guncelleyin.")
return
print("=== ULKE BAZLI ISTEK DAGILIMI ===")
for country, count in country_stats.most_common(15):
bar = '#' * (count // max(1, max(country_stats.values()) // 40))
print(f" {country:<25} {bar} ({count:,})")
if unknown_ips:
print(f"n Bilinmeyen/Ozel IP: {unknown_ips:,}")
if __name__ == "__main__":
import sys
analyze_geo(sys.argv[1] if len(sys.argv) > 1 else '/var/log/nginx/access.log')
EOF
python3 geo_analysis.py /var/log/nginx/access.log
Scripti Production’a Hazırlamak
Bir scripti “çalışır” yapmak ayrı şeydir, “production-ready” yapmak ayrı. İşte dikkat etmeniz gereken noktalar:
- Logging ekleyin: Script kendi hatalarını da loglasın,
print()yeterli değil - Timeout mekanizması: Çok büyük dosyalarda takılı kalmasın, sinyal handler ekleyin
- Konfigürasyonu dışarı çıkarın: Sabit kodlanmış path’ler yerine argparse veya config dosyası kullanın
- İzinleri kontrol edin: Script root olarak mı çalışmalı? Gerekirse sudoers ile sadece belirli komutlara izin verin
- İdempotent olsun: Aynı logu iki kez işlediğinde duplicate alarm üretmesin, işlenen pozisyonu bir state dosyasına kaydedin
- Hata yakalama: Her dosya okuma işlemini try/except içine alın, bir log dosyası okunamazsa tüm script durmasın
# State dosyası ile duplicate işlemeyi önleme örneği
cat << 'EOF' >> parse_nginx.py
def get_last_position(state_file):
try:
with open(state_file, 'r') as f:
return int(f.read().strip())
except (FileNotFoundError, ValueError):
return 0
def save_position(state_file, position):
with open(state_file, 'w') as f:
f.write(str(position))
# Kullanim:
# state_file = '/var/run/nginx_parser.state'
# last_pos = get_last_position(state_file)
# with open(log_file) as f:
# f.seek(last_pos)
# for line in f:
# process(line)
# save_position(state_file, f.tell())
EOF
Sonuç
Log analizi sysadmin işinin belki de en az “eğlenceli” ama en kritik kısmı. İyi yazılmış Python scriptleri bu işi hem hızlandırır hem de güvenilir hale getirir. Yukarıdaki örneklerde gördüğünüz yaklaşımları kendi altyapınıza adapt ederken şunları aklınızda tutun:
Generator kullanmayı ihmal etmeyin, özellikle büyük dosyalarda. Regex pattern’larınızı re.compile() ile önceden derleyin, döngü içinde her seferinde derleme yapmayın. Tek geçişte birden fazla analiz yapın, aynı dosyayı beş farklı amaçla beş kez okumayın. State yönetimi ekleyin ki logrotate’den sonra kaldığınız yerden devam edebilin.
En önemli nokta: Bu scriptleri yazıp çekmece kapatmayın. Cron’a ekleyin, çıktılarını bir yerde toplayın, zaman içinde baseline oluşturun. “Normal” nedir bilmeden “anormal” tespit edemezsiniz. Python’u bu işte bir kez ciddiye aldığınızda, bir daha “Nginx loglarına bakmak gerekiyor” diye saatlerce terminal başında oturmazsınız.