Log Dosyası Analizi: Python ile Log Ayrıştırma

Günlük yüzlerce gigabyte log üreten sistemlerde bir şeylerin yanlış gittiğini anlamak, samanlıkta iğne aramaktan farksız. Saldırı girişimleri, uygulama hataları, performans sorunları… hepsi log dosyalarının derinliklerinde gömülü bekliyor. Elle okumak imkansız, grep ve awk tek başına yetmiyor, SIEM lisansı da her yerde yok. İşte tam bu noktada Python devreye giriyor ve log analizini hem güçlü hem de tamamen özelleştirilebilir hale getiriyor.

Neden Python ile Log Analizi?

grep ve awk muhteşem araçlar, onları küçümsemiyorum. Ama iş birden fazla log kaynağını ilişkilendirmeye, pattern detection yapmaya veya sonuçları bir rapora dönüştürmeye geldiğinde yetersiz kalıyorlar. Python’un bu işte avantajları şunlar:

  • Regex desteği güçlü ve okunabilir
  • Birden fazla log formatını aynı script içinde işleyebilirsiniz
  • Veriyi yapılandırıp veritabanına veya JSON’a yazabilirsiniz
  • Koşullu analizler yapabilirsiniz (örneğin aynı IP’den 5 dakikada 100 istek)
  • Alarm mekanizmaları entegre edebilirsiniz
  • Sonuçları e-posta, Slack veya Telegram’a gönderebilirsiniz

Haydi pratik senaryolarla başlayalım.

Temel Log Ayrıştırma: Nginx Access Log

En yaygın senaryo: Nginx access log analizi. Bu loglar şu formatta gelir:

192.168.1.105 - frank [10/Oct/2023:13:55:36 -0700] "GET /index.html HTTP/1.1" 200 2326

İlk scriptimizde bu formatı ayrıştırıp anlamlı verilere dönüştüreceğiz:

cat << 'EOF' > parse_nginx.py
import re
import sys
from collections import Counter, defaultdict
from datetime import datetime

# Nginx combined log format için regex
LOG_PATTERN = re.compile(
    r'(?P<ip>d+.d+.d+.d+)s+'
    r'(?P<ident>-|w+)s+'
    r'(?P<user>-|w+)s+'
    r'[(?P<time>[^]]+)]s+'
    r'"(?P<method>w+)s+(?P<path>[^s]+)s+(?P<protocol>[^"]+)"s+'
    r'(?P<status>d+)s+'
    r'(?P<size>d+|-)'
)

def parse_log_file(filepath):
    entries = []
    errors = 0
    
    with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
        for line_num, line in enumerate(f, 1):
            match = LOG_PATTERN.match(line.strip())
            if match:
                entry = match.groupdict()
                entry['status'] = int(entry['status'])
                entry['size'] = int(entry['size']) if entry['size'] != '-' else 0
                entries.append(entry)
            else:
                errors += 1
    
    print(f"Toplam {len(entries)} kayit islendi, {errors} satir parse edilemedi.")
    return entries

def analyze_logs(entries):
    ip_counter = Counter(e['ip'] for e in entries)
    status_counter = Counter(e['status'] for e in entries)
    path_counter = Counter(e['path'] for e in entries)
    
    print("n=== TOP 10 IP ADRESI ===")
    for ip, count in ip_counter.most_common(10):
        print(f"  {ip}: {count} istek")
    
    print("n=== HTTP DURUM KODLARI ===")
    for status, count in sorted(status_counter.items()):
        print(f"  {status}: {count} istek")
    
    print("n=== EN COK ISTENEN 10 URL ===")
    for path, count in path_counter.most_common(10):
        print(f"  {path}: {count} istek")

if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("Kullanim: python parse_nginx.py /var/log/nginx/access.log")
        sys.exit(1)
    
    entries = parse_log_file(sys.argv[1])
    analyze_logs(entries)
EOF
python3 parse_nginx.py /var/log/nginx/access.log

Gerçek Dünya Senaryosu 1: Brute Force Tespiti

Bir sabah sunucuya bağlandınız ve auth.log’da tuhaf bir aktivite var. SSH brute force saldırısı mı? Python ile bunu otomatik olarak tespit eden bir script yazalım:

cat << 'EOF' > detect_bruteforce.py
import re
from collections import defaultdict
from datetime import datetime, timedelta

# Eşik değerleri
THRESHOLD_COUNT = 10      # Kaç başarısız denemeden sonra alarm
THRESHOLD_WINDOW = 300    # Kaç saniyelik pencerede (5 dakika)

AUTH_LOG = '/var/log/auth.log'

# Başarısız SSH girişimi için pattern
FAILED_PATTERN = re.compile(
    r'(?P<month>w+)s+(?P<day>d+)s+(?P<time>d+:d+:d+).*'
    r'Failed password for (?:invalid user )?(?P<user>S+) from '
    r'(?P<ip>d+.d+.d+.d+)'
)

# Başarılı giriş pattern
SUCCESS_PATTERN = re.compile(
    r'(?P<month>w+)s+(?P<day>d+)s+(?P<time>d+:d+:d+).*'
    r'Accepted (?:password|publickey) for (?P<user>S+) from '
    r'(?P<ip>d+.d+.d+.d+)'
)

def parse_auth_log(filepath):
    failed_attempts = defaultdict(list)
    successful_logins = []
    current_year = datetime.now().year
    
    with open(filepath, 'r', errors='ignore') as f:
        for line in f:
            failed_match = FAILED_PATTERN.search(line)
            if failed_match:
                d = failed_match.groupdict()
                try:
                    timestamp_str = f"{current_year} {d['month']} {d['day']} {d['time']}"
                    timestamp = datetime.strptime(timestamp_str, "%Y %b %d %H:%M:%S")
                    failed_attempts[d['ip']].append({
                        'timestamp': timestamp,
                        'user': d['user']
                    })
                except ValueError:
                    continue
            
            success_match = SUCCESS_PATTERN.search(line)
            if success_match:
                successful_logins.append(success_match.groupdict())
    
    return failed_attempts, successful_logins

def detect_bruteforce(failed_attempts):
    attackers = {}
    
    for ip, attempts in failed_attempts.items():
        attempts.sort(key=lambda x: x['timestamp'])
        
        # Sliding window analizi
        for i, attempt in enumerate(attempts):
            window_start = attempt['timestamp']
            window_end = window_start + timedelta(seconds=THRESHOLD_WINDOW)
            
            window_attempts = [
                a for a in attempts[i:]
                if a['timestamp'] <= window_end
            ]
            
            if len(window_attempts) >= THRESHOLD_COUNT:
                targeted_users = list(set(a['user'] for a in window_attempts))
                attackers[ip] = {
                    'count': len(attempts),
                    'window_count': len(window_attempts),
                    'targeted_users': targeted_users,
                    'first_attempt': attempts[0]['timestamp'],
                    'last_attempt': attempts[-1]['timestamp']
                }
                break
    
    return attackers

def generate_report(attackers, successful_logins):
    if not attackers:
        print("[OK] Brute force saldirisi tespit edilmedi.")
        return
    
    print(f"n[ALARM] {len(attackers)} potansiyel brute force kaynagi tespit edildi!n")
    
    for ip, data in sorted(attackers.items(), key=lambda x: x[1]['count'], reverse=True):
        print(f"IP: {ip}")
        print(f"  Toplam basarisiz deneme: {data['count']}")
        print(f"  {THRESHOLD_WINDOW}sn pencerede: {data['window_count']} deneme")
        print(f"  Hedef kullanicilar: {', '.join(data['targeted_users'][:5])}")
        print(f"  Ilk deneme: {data['first_attempt']}")
        print(f"  Son deneme: {data['last_attempt']}")
        
        # Bu IP başarılı giriş yapmış mı?
        successful_ips = [s['ip'] for s in successful_logins]
        if ip in successful_ips:
            print(f"  [KRITIK] Bu IP BASARILI GIRIS YAPMIS!")
        print()

if __name__ == "__main__":
    print("Auth log analiz ediliyor...")
    failed, successful = parse_auth_log(AUTH_LOG)
    print(f"Toplam {sum(len(v) for v in failed.values())} basarisiz giris, "
          f"{len(successful)} basarili giris bulundu.")
    
    attackers = detect_bruteforce(failed)
    generate_report(attackers, successful)
EOF
python3 detect_bruteforce.py

Bu script çıktısında [KRITIK] görürseniz, o IP’yi hemen iptables’a alın.

Log Rotasyonu ile Başa Çıkmak

Gerçek hayatta loglar rotate edilir: access.log, access.log.1, access.log.2.gz gibi. Tüm bu dosyaları şeffaf şekilde okuyabilen bir utility fonksiyon yazalım:

cat << 'EOF' > log_reader.py
import gzip
import glob
import os
from pathlib import Path

def iter_log_files(base_path, max_rotations=7):
    """
    Log dosyalarını kronolojik sırayla okur.
    Sıkıştırılmış dosyaları da destekler.
    """
    files_to_read = []
    
    base = Path(base_path)
    base_dir = base.parent
    base_name = base.name
    
    # Ana log dosyası
    if base.exists():
        files_to_read.append((base.stat().st_mtime, str(base), False))
    
    # Sayısal rotation: .1, .2, .3 ...
    for i in range(1, max_rotations + 1):
        rotated = base_dir / f"{base_name}.{i}"
        rotated_gz = base_dir / f"{base_name}.{i}.gz"
        
        if rotated.exists():
            files_to_read.append((rotated.stat().st_mtime, str(rotated), False))
        elif rotated_gz.exists():
            files_to_read.append((rotated_gz.stat().st_mtime, str(rotated_gz), True))
    
    # Tarih bazlı rotation: access.log-20231015
    pattern = str(base_dir / f"{base_name}-*")
    for filepath in glob.glob(pattern):
        is_gz = filepath.endswith('.gz')
        files_to_read.append((os.path.getmtime(filepath), filepath, is_gz))
    
    # Eskiden yeniye sırala
    files_to_read.sort(key=lambda x: x[0])
    
    for mtime, filepath, is_compressed in files_to_read:
        print(f"  Okunuyor: {filepath}")
        try:
            if is_compressed:
                with gzip.open(filepath, 'rt', encoding='utf-8', errors='ignore') as f:
                    yield from f
            else:
                with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
                    yield from f
        except (IOError, OSError) as e:
            print(f"  HATA: {filepath} okunamadi: {e}")

# Kullanim ornegi
if __name__ == "__main__":
    line_count = 0
    for line in iter_log_files('/var/log/nginx/access.log'):
        line_count += 1
    print(f"Toplam {line_count} satir okundu.")
EOF
python3 log_reader.py

Gerçek Dünya Senaryosu 2: Uygulama Hata Takibi

Java, Python veya Node.js uygulamalarının ürettiği stack trace içeren logları analiz edelim. Bu tip loglar çok satırlı olduğundan işlemesi daha karmaşık:

cat << 'EOF' > parse_app_errors.py
import re
from collections import defaultdict, Counter
from dataclasses import dataclass, field
from typing import List, Optional
import json

@dataclass
class LogEntry:
    timestamp: str
    level: str
    message: str
    stack_trace: List[str] = field(default_factory=list)
    logger: Optional[str] = None

# Örnek: 2023-10-15 14:23:45,123 ERROR [com.example.Service] - Connection failed
LOG_LINE_PATTERN = re.compile(
    r'(?P<timestamp>d{4}-d{2}-d{2}s+d{2}:d{2}:d{2}[,.]d{3})s+'
    r'(?P<level>DEBUG|INFO|WARN|WARNING|ERROR|CRITICAL|FATAL)s+'
    r'(?:[(?P<logger>[^]]+)]s+)?'
    r'(?:[-:]s+)?(?P<message>.+)'
)

STACK_TRACE_PATTERN = re.compile(r'^s+(ats+|Files+|Caused by:|Exception in)')

def parse_multiline_log(filepath):
    entries = []
    current_entry = None
    
    with open(filepath, 'r', errors='ignore') as f:
        for line in f:
            line_stripped = line.rstrip()
            
            # Yeni log satırı mı?
            match = LOG_LINE_PATTERN.match(line_stripped)
            if match:
                if current_entry:
                    entries.append(current_entry)
                
                d = match.groupdict()
                current_entry = LogEntry(
                    timestamp=d['timestamp'],
                    level=d['level'],
                    message=d['message'],
                    logger=d.get('logger')
                )
            elif current_entry and (STACK_TRACE_PATTERN.match(line_stripped) or 
                                     line_stripped.startswith('t')):
                # Stack trace satırı
                current_entry.stack_trace.append(line_stripped.strip())
            
        if current_entry:
            entries.append(current_entry)
    
    return entries

def analyze_errors(entries):
    error_entries = [e for e in entries if e.level in ('ERROR', 'CRITICAL', 'FATAL')]
    
    # Hata mesajlarını grupla (sayısal kısımları normalize et)
    def normalize_message(msg):
        msg = re.sub(r'bd+b', 'N', msg)
        msg = re.sub(r'[0-9a-f]{8}-[0-9a-f-]{27}', 'UUID', msg)
        return msg[:80]
    
    error_groups = Counter(normalize_message(e.message) for e in error_entries)
    
    print(f"Toplam log kaydi: {len(entries)}")
    print(f"ERROR/CRITICAL/FATAL: {len(error_entries)}")
    print(f"n=== EN SIK TEKRARLANAN HATALAR ===")
    
    for msg, count in error_groups.most_common(10):
        print(f"n  [{count}x] {msg}")
        
        # Bu hata tipine ait ilk stack trace'i göster
        for entry in error_entries:
            if normalize_message(entry.message) == msg and entry.stack_trace:
                print("  Stack trace (ilk 3 satir):")
                for line in entry.stack_trace[:3]:
                    print(f"    {line}")
                break
    
    # Zaman bazlı analiz: saate göre hata dağılımı
    hour_counter = Counter()
    for entry in error_entries:
        try:
            hour = entry.timestamp[11:13]
            hour_counter[hour] += 1
        except IndexError:
            pass
    
    print(f"n=== SAATLIK HATA DAGILIMI ===")
    for hour in sorted(hour_counter.keys()):
        bar = '#' * (hour_counter[hour] // max(1, max(hour_counter.values()) // 30))
        print(f"  {hour}:00 | {bar} ({hour_counter[hour]})")

if __name__ == "__main__":
    import sys
    log_file = sys.argv[1] if len(sys.argv) > 1 else '/var/log/myapp/application.log'
    
    print(f"Analiz ediliyor: {log_file}")
    entries = parse_multiline_log(log_file)
    analyze_errors(entries)
EOF
python3 parse_app_errors.py /var/log/myapp/application.log

Performans: Büyük Log Dosyalarını Verimli İşlemek

100GB log dosyasını parse etmeye çalışıyorsanız, tüm veriyi RAM’e yüklemek felakete davet çıkarmaktır. Generator tabanlı yaklaşım şart:

cat << 'EOF' > efficient_parser.py
import re
import mmap
from typing import Generator, Dict
import time

# Generator tabanlı parser - RAM dostu
def parse_nginx_generator(filepath: str) -> Generator[Dict, None, None]:
    pattern = re.compile(
        rb'(d+.d+.d+.d+).+[([^]]+)] '
        rb'"(w+) ([^s]+) [^"]+" (d+) (d+|-)'
    )
    
    parsed = 0
    with open(filepath, 'rb') as f:
        # mmap ile memory-mapped okuma
        with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
            for line in iter(mm.readline, b''):
                match = pattern.search(line)
                if match:
                    parsed += 1
                    yield {
                        'ip': match.group(1).decode(),
                        'time': match.group(2).decode(),
                        'method': match.group(3).decode(),
                        'path': match.group(4).decode(),
                        'status': int(match.group(5)),
                        'size': int(match.group(6)) if match.group(6) != b'-' else 0
                    }
    
    print(f"Toplam {parsed} kayit islendi.")

def streaming_analysis(filepath: str):
    """Tek geçişte birden fazla metrik hesapla"""
    from collections import Counter, defaultdict
    
    ip_count = Counter()
    status_count = Counter()
    total_bytes = 0
    error_ips = defaultdict(int)
    start_time = time.time()
    record_count = 0
    
    for entry in parse_nginx_generator(filepath):
        record_count += 1
        ip_count[entry['ip']] += 1
        status_count[entry['status']] += 1
        total_bytes += entry['size']
        
        if entry['status'] >= 400:
            error_ips[entry['ip']] += 1
        
        # İlerleme göster
        if record_count % 500000 == 0:
            elapsed = time.time() - start_time
            print(f"  {record_count:,} kayit islendi ({record_count/elapsed:.0f} kayit/sn)")
    
    elapsed = time.time() - start_time
    print(f"nToplam sure: {elapsed:.2f}sn, Hiz: {record_count/elapsed:.0f} kayit/sn")
    print(f"Toplam veri: {total_bytes / (1024**3):.2f} GB")
    print(f"nTop 5 IP: {ip_count.most_common(5)}")
    print(f"Top 5 Hata Ureten IP: {sorted(error_ips.items(), key=lambda x: x[1], reverse=True)[:5]}")
    print(f"Status dagilimi: {dict(sorted(status_count.items()))}")

if __name__ == "__main__":
    import sys
    streaming_analysis(sys.argv[1] if len(sys.argv) > 1 else '/var/log/nginx/access.log')
EOF
python3 efficient_parser.py /var/log/nginx/access.log

Gerçek Dünya Senaryosu 3: Otomatik Rapor ve Alarm

Sabah 9’da posta kutunuzda günlük log analiz raporu hazır olsun. Cron ile çalışacak, hem rapor üreten hem de kritik durumda e-posta gönderen bir script:

cat << 'EOF' > daily_log_report.py
#!/usr/bin/env python3
"""
Gunluk log analiz raporu
Cron: 0 8 * * * /usr/bin/python3 /opt/scripts/daily_log_report.py
"""

import re
import smtplib
import json
import os
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from collections import Counter, defaultdict
from datetime import datetime, timedelta

# Konfigürasyon
CONFIG = {
    'nginx_log': '/var/log/nginx/access.log',
    'auth_log': '/var/log/auth.log',
    'report_dir': '/var/reports/logs',
    'smtp_host': 'localhost',
    'smtp_port': 25,
    'from_email': '[email protected]',
    'to_emails': ['[email protected]'],
    'hostname': os.uname().nodename,
    'brute_force_threshold': 20,
    'error_rate_threshold': 0.05  # %5 üzeri hata oranı alarm
}

def analyze_nginx_for_report():
    results = {
        'total_requests': 0,
        'unique_ips': set(),
        'status_codes': Counter(),
        'top_ips': Counter(),
        'top_paths': Counter(),
        'bytes_served': 0
    }
    
    pattern = re.compile(
        r'(d+.d+.d+.d+).+[([^]]+)] '
        r'"(w+) ([^s]+) [^"]+" (d+) (d+|-)'
    )
    
    # Sadece bugünün loglarını al
    today = datetime.now().strftime('%d/%b/%Y')
    
    try:
        with open(CONFIG['nginx_log'], 'r', errors='ignore') as f:
            for line in f:
                if today not in line:
                    continue
                match = pattern.search(line)
                if not match:
                    continue
                
                ip, time_str, method, path, status, size = match.groups()
                results['total_requests'] += 1
                results['unique_ips'].add(ip)
                results['status_codes'][int(status)] += 1
                results['top_ips'][ip] += 1
                results['top_paths'][path] += 1
                results['bytes_served'] += int(size) if size != '-' else 0
    except FileNotFoundError:
        pass
    
    results['unique_ips'] = len(results['unique_ips'])
    return results

def check_auth_anomalies():
    failed_by_ip = Counter()
    today = datetime.now().strftime('%b %d').replace(' 0', '  ')
    
    pattern = re.compile(
        r'Failed password for.*from (d+.d+.d+.d+)'
    )
    
    try:
        with open(CONFIG['auth_log'], 'r', errors='ignore') as f:
            for line in f:
                if not (line.startswith(today) or 
                        line.startswith(datetime.now().strftime('%b %d'))):
                    continue
                match = pattern.search(line)
                if match:
                    failed_by_ip[match.group(1)] += 1
    except FileNotFoundError:
        pass
    
    suspicious = {ip: count for ip, count in failed_by_ip.items() 
                  if count >= CONFIG['brute_force_threshold']}
    return failed_by_ip, suspicious

def build_report(nginx_data, auth_failed, suspicious_ips):
    error_count = sum(v for k, v in nginx_data['status_codes'].items() if k >= 400)
    total = nginx_data['total_requests'] or 1
    error_rate = error_count / total
    
    is_critical = bool(suspicious_ips) or error_rate > CONFIG['error_rate_threshold']
    
    report_lines = [
        f"SUNUCU: {CONFIG['hostname']}",
        f"TARIH: {datetime.now().strftime('%Y-%m-%d %H:%M')}",
        f"DURUM: {'[ALARM] DIKKAT GEREKTIREN DURUMLAR VAR' if is_critical else '[OK] Normal'}",
        "",
        "=== NGINX OZETI ===",
        f"Toplam Istek: {nginx_data['total_requests']:,}",
        f"Benzersiz IP: {nginx_data['unique_ips']:,}",
        f"Sunulan Veri: {nginx_data['bytes_served'] / (1024**2):.1f} MB",
        f"Hata Orani: {error_rate:.1%}",
        "",
        "HTTP Durum Kodlari:",
    ]
    
    for code in sorted(nginx_data['status_codes'].keys()):
        report_lines.append(f"  {code}: {nginx_data['status_codes'][code]:,}")
    
    report_lines += [
        "",
        "Top 5 IP:",
    ]
    for ip, count in nginx_data['top_ips'].most_common(5):
        report_lines.append(f"  {ip}: {count:,} istek")
    
    report_lines += [
        "",
        "=== GUVENLIK OZETI ===",
        f"Basarisiz SSH denemesi: {sum(auth_failed.values())}",
        f"Suphe cekici IP sayisi: {len(suspicious_ips)}",
    ]
    
    if suspicious_ips:
        report_lines.append("n[ALARM] BRUTE FORCE SUPHELISI IPLER:")
        for ip, count in sorted(suspicious_ips.items(), key=lambda x: x[1], reverse=True):
            report_lines.append(f"  {ip}: {count} basarisiz deneme")
            report_lines.append(f"  Onerilen komut: iptables -I INPUT -s {ip} -j DROP")
    
    return "n".join(report_lines), is_critical

def send_email(subject, body):
    msg = MIMEMultipart()
    msg['From'] = CONFIG['from_email']
    msg['To'] = ', '.join(CONFIG['to_emails'])
    msg['Subject'] = subject
    msg.attach(MIMEText(body, 'plain', 'utf-8'))
    
    try:
        with smtplib.SMTP(CONFIG['smtp_host'], CONFIG['smtp_port']) as server:
            server.sendmail(CONFIG['from_email'], CONFIG['to_emails'], msg.as_string())
        print("E-posta gonderildi.")
    except Exception as e:
        print(f"E-posta gonderilemedi: {e}")

def save_report(report_text):
    os.makedirs(CONFIG['report_dir'], exist_ok=True)
    filename = f"report_{datetime.now().strftime('%Y%m%d_%H%M')}.txt"
    filepath = os.path.join(CONFIG['report_dir'], filename)
    with open(filepath, 'w') as f:
        f.write(report_text)
    print(f"Rapor kaydedildi: {filepath}")

if __name__ == "__main__":
    print("Log analizi basliyor...")
    
    nginx_data = analyze_nginx_for_report()
    auth_failed, suspicious_ips = check_auth_anomalies()
    report_text, is_critical = build_report(nginx_data, auth_failed, suspicious_ips)
    
    print(report_text)
    save_report(report_text)
    
    subject_prefix = "[ALARM]" if is_critical else "[OK]"
    subject = f"{subject_prefix} {CONFIG['hostname']} Gunluk Log Raporu - {datetime.now().strftime('%Y-%m-%d')}"
    send_email(subject, report_text)
EOF
chmod +x daily_log_report.py
# Cron'a ekle:
# echo "0 8 * * * /usr/bin/python3 /opt/scripts/daily_log_report.py >> /var/log/log_report.log 2>&1" | crontab -
python3 daily_log_report.py

GeoIP ile Coğrafi Analiz

Hangi ülkelerden istek geliyor? Brute force hangi ülkeden? Bunun için geoip2 kütüphanesini kullanalım:

# Önce kütüphaneyi ve MaxMind veritabanını kuralım
pip3 install geoip2
# MaxMind ücretsiz GeoLite2 veritabanını indirin: https://dev.maxmind.com/geoip/geolite2-free-geolocation-data

cat << 'EOF' > geo_analysis.py
import re
import geoip2.database
from collections import Counter

def analyze_geo(log_file, geoip_db='/usr/share/GeoIP/GeoLite2-Country.mmdb'):
    ip_pattern = re.compile(r'^(d+.d+.d+.d+)')
    ip_counter = Counter()
    
    with open(log_file, 'r', errors='ignore') as f:
        for line in f:
            match = ip_pattern.match(line)
            if match:
                ip_counter[match.group(1)] += 1
    
    country_stats = Counter()
    unknown_ips = 0
    
    try:
        with geoip2.database.Reader(geoip_db) as reader:
            for ip, count in ip_counter.items():
                try:
                    response = reader.country(ip)
                    country = response.country.name or 'Bilinmeyen'
                    country_stats[country] += count
                except Exception:
                    unknown_ips += count
    except FileNotFoundError:
        print(f"GeoIP veritabani bulunamadi: {geoip_db}")
        print("MaxMind GeoLite2 veritabanini indirip yolu guncelleyin.")
        return
    
    print("=== ULKE BAZLI ISTEK DAGILIMI ===")
    for country, count in country_stats.most_common(15):
        bar = '#' * (count // max(1, max(country_stats.values()) // 40))
        print(f"  {country:<25} {bar} ({count:,})")
    
    if unknown_ips:
        print(f"n  Bilinmeyen/Ozel IP: {unknown_ips:,}")

if __name__ == "__main__":
    import sys
    analyze_geo(sys.argv[1] if len(sys.argv) > 1 else '/var/log/nginx/access.log')
EOF
python3 geo_analysis.py /var/log/nginx/access.log

Scripti Production’a Hazırlamak

Bir scripti “çalışır” yapmak ayrı şeydir, “production-ready” yapmak ayrı. İşte dikkat etmeniz gereken noktalar:

  • Logging ekleyin: Script kendi hatalarını da loglasın, print() yeterli değil
  • Timeout mekanizması: Çok büyük dosyalarda takılı kalmasın, sinyal handler ekleyin
  • Konfigürasyonu dışarı çıkarın: Sabit kodlanmış path’ler yerine argparse veya config dosyası kullanın
  • İzinleri kontrol edin: Script root olarak mı çalışmalı? Gerekirse sudoers ile sadece belirli komutlara izin verin
  • İdempotent olsun: Aynı logu iki kez işlediğinde duplicate alarm üretmesin, işlenen pozisyonu bir state dosyasına kaydedin
  • Hata yakalama: Her dosya okuma işlemini try/except içine alın, bir log dosyası okunamazsa tüm script durmasın
# State dosyası ile duplicate işlemeyi önleme örneği
cat << 'EOF' >> parse_nginx.py

def get_last_position(state_file):
    try:
        with open(state_file, 'r') as f:
            return int(f.read().strip())
    except (FileNotFoundError, ValueError):
        return 0

def save_position(state_file, position):
    with open(state_file, 'w') as f:
        f.write(str(position))

# Kullanim:
# state_file = '/var/run/nginx_parser.state'
# last_pos = get_last_position(state_file)
# with open(log_file) as f:
#     f.seek(last_pos)
#     for line in f:
#         process(line)
#     save_position(state_file, f.tell())
EOF

Sonuç

Log analizi sysadmin işinin belki de en az “eğlenceli” ama en kritik kısmı. İyi yazılmış Python scriptleri bu işi hem hızlandırır hem de güvenilir hale getirir. Yukarıdaki örneklerde gördüğünüz yaklaşımları kendi altyapınıza adapt ederken şunları aklınızda tutun:

Generator kullanmayı ihmal etmeyin, özellikle büyük dosyalarda. Regex pattern’larınızı re.compile() ile önceden derleyin, döngü içinde her seferinde derleme yapmayın. Tek geçişte birden fazla analiz yapın, aynı dosyayı beş farklı amaçla beş kez okumayın. State yönetimi ekleyin ki logrotate’den sonra kaldığınız yerden devam edebilin.

En önemli nokta: Bu scriptleri yazıp çekmece kapatmayın. Cron’a ekleyin, çıktılarını bir yerde toplayın, zaman içinde baseline oluşturun. “Normal” nedir bilmeden “anormal” tespit edemezsiniz. Python’u bu işte bir kez ciddiye aldığınızda, bir daha “Nginx loglarına bakmak gerekiyor” diye saatlerce terminal başında oturmazsınız.

Yorum yapın