LocalAI ile Gerçek Zamanlı Log Analizi: Sunucu Hatalarını Yapay Zeka ile Otomatik Tespit ve Sınıflandırma
Gece 2’de pager çalıyor, ekranına bakıyorsun ve 50.000 satırlık log dosyasıyla karşı karşıyasın. Nerede başlayacaksın? Hangi hata kritik, hangisi sadece gürültü? İşte tam bu noktada LocalAI tabanlı bir log analiz sistemi hayat kurtarıcı oluyor. Bu yazıda, sunucunda tamamen yerel çalışan bir yapay zeka modeliyle gerçek zamanlı log analizi nasıl yapılır, bunu adım adım inceleyeceğiz.
Neden LocalAI ile Log Analizi?
Klasik log analiz araçları (Elasticsearch, Graylog, Splunk) kesinlikle güçlü araçlar. Ancak hepsinin ortak bir sorunu var: kural tabanlı çalışıyorlar. “ERROR” içeren satırları yakala, şu regex pattern’ı ara, şu eşiği geç uyar. Peki ya daha önce hiç görmediğin bir hata mesajı? Ya da birbirleriyle ilişkili ama ayrı ayrı zararsız görünen birkaç warning?
LocalAI burada devreye giriyor. OpenAI API uyumlu, tamamen açık kaynaklı ve en önemlisi verilerini dışarı göndermiyorsun. Production log’larında müşteri verileri, internal IP adresleri, servis konfigürasyonları olabilir. Bunları herhangi bir bulut API’ye göndermek ciddi bir güvenlik riski. LocalAI ile model sunucunda çalışıyor, veri sunucuda kalıyor.
Ayrıca maliyet açısından düşün: binlerce log satırını GPT-4 API’ye göndermeye kalktığında token maliyeti hızla patlar. LocalAI ile bu maliyet sıfır.
Sistem Mimarisi
Bu yazıda kuracağımız sistemin genel yapısı şöyle:
- Log Kaynakları: syslog, nginx/apache access logs, application logs, systemd journal
- Log Toplayıcı: Python tabanlı log watcher (watchdog kütüphanesi)
- Analiz Motoru: LocalAI üzerinde çalışan Mistral 7B veya Llama 3 modeli
- Sınıflandırma Katmanı: Her log satırı için severity, kategori ve önerilen aksiyon
- Bildirim: Slack webhook veya email entegrasyonu
LocalAI Kurulumu ve Model Hazırlığı
Önce LocalAI’ı Docker ile ayağa kaldıralım. Bu en hızlı ve izole yöntem:
# LocalAI dizinini oluştur
mkdir -p /opt/localai/models
cd /opt/localai
# Docker Compose dosyası oluştur
cat > docker-compose.yml << 'EOF'
version: '3.8'
services:
localai:
image: localai/localai:latest-aio-cpu
ports:
- "8080:8080"
volumes:
- ./models:/build/models
environment:
- THREADS=4
- CONTEXT_SIZE=4096
- MODELS_PATH=/build/models
restart: unless-stopped
deploy:
resources:
limits:
memory: 8G
EOF
docker-compose up -d
Şimdi bir model indirelim. Log analizi için Mistral 7B oldukça iyi sonuç veriyor, hem hızlı hem de teknik metinlerde başarılı:
# Mistral 7B GGUF modelini indir
cd /opt/localai/models
wget https://huggingface.co/TheBloke/Mistral-7B-Instruct-v0.2-GGUF/resolve/main/mistral-7b-instruct-v0.2.Q4_K_M.gguf
# Model konfigürasyon dosyası oluştur
cat > mistral.yaml << 'EOF'
name: mistral
backend: llama
model: mistral-7b-instruct-v0.2.Q4_K_M.gguf
parameters:
temperature: 0.1
top_p: 0.9
max_new_tokens: 512
context_size: 4096
template:
chat: |
<s>[INST] {{.Input}} [/INST]
EOF
Düşük temperature (0.1) kullandığımıza dikkat et. Log analizi yaratıcılık gerektirmiyor, tutarlılık ve tekrarlanabilirlik istiyor. Temperature 0.1 ile model her seferinde benzer cevaplar üretiyor.
Log Analiz Script’inin Temeli
Şimdi asıl işi yapacak Python script’ini yazalım. Önce temel yapıyı kuralım:
# Gerekli paketleri yükle
pip3 install watchdog requests python-dotenv colorlog
# Ana script dosyasını oluştur
cat > /opt/localai/log_analyzer.py << 'PYEOF'
#!/usr/bin/env python3
import requests
import json
import re
import sys
from datetime import datetime
LOCALAI_URL = "http://localhost:8080/v1/chat/completions"
MODEL_NAME = "mistral"
def analyze_log_entry(log_line: str) -> dict:
"""Tek bir log satırını analiz et ve sınıflandır"""
prompt = f"""Sen bir Linux sistem yöneticisi asistanısın.
Aşağıdaki log satırını analiz et ve JSON formatında yanıt ver.
Log satırı: {log_line}
Şu bilgileri JSON olarak döndür:
- severity: "CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO" değerlerinden biri
- category: "authentication", "disk", "memory", "network", "application", "security", "database", "other"
- summary: Kısa Türkçe açıklama (max 100 karakter)
- action_required: true/false
- suggested_action: Önerilen aksiyon (sadece action_required true ise)
Sadece JSON döndür, başka bir şey yazma."""
payload = {
"model": MODEL_NAME,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1,
"max_tokens": 256
}
try:
response = requests.post(LOCALAI_URL, json=payload, timeout=30)
content = response.json()["choices"][0]["message"]["content"]
# JSON'u temizle ve parse et
json_match = re.search(r'{.*}', content, re.DOTALL)
if json_match:
return json.loads(json_match.group())
except Exception as e:
return {"severity": "UNKNOWN", "error": str(e)}
return {"severity": "UNKNOWN"}
# Test
if __name__ == "__main__":
test_log = "Failed password for invalid user admin from 185.234.xx.xx port 52341 ssh2"
result = analyze_log_entry(test_log)
print(json.dumps(result, ensure_ascii=False, indent=2))
PYEOF
chmod +x /opt/localai/log_analyzer.py
python3 /opt/localai/log_analyzer.py
Gerçek Zamanlı Log İzleme
Şimdi watchdog ile dosyaları gerçek zamanlı izleyelim. Bu kısım özellikle production ortamda kritik:
cat > /opt/localai/log_watcher.py << 'PYEOF'
#!/usr/bin/env python3
import time
import os
import threading
import queue
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from log_analyzer import analyze_log_entry
import logging
# Kuyruk tabanlı analiz için thread-safe queue
log_queue = queue.Queue(maxsize=1000)
# İzlenecek log dosyaları
WATCH_PATHS = [
"/var/log/syslog",
"/var/log/auth.log",
"/var/log/nginx/error.log",
"/var/log/nginx/access.log",
"/var/log/postgresql/postgresql.log"
]
class LogFileHandler(FileSystemEventHandler):
def __init__(self, filepath):
self.filepath = filepath
self.file_pos = os.path.getsize(filepath) if os.path.exists(filepath) else 0
def on_modified(self, event):
if event.src_path != self.filepath:
return
with open(self.filepath, 'r', encoding='utf-8', errors='ignore') as f:
f.seek(self.file_pos)
new_lines = f.readlines()
self.file_pos = f.tell()
for line in new_lines:
line = line.strip()
if line and len(line) > 20: # Boş ve çok kısa satırları atla
try:
log_queue.put_nowait({
"source": self.filepath,
"line": line,
"timestamp": time.time()
})
except queue.Full:
logging.warning("Log queue dolu, satır atlandı")
def analysis_worker():
"""Kuyruktan log satırlarını alıp analiz eden worker thread"""
while True:
try:
log_entry = log_queue.get(timeout=5)
result = analyze_log_entry(log_entry["line"])
# Sadece önemli bulguları işle
if result.get("severity") in ["CRITICAL", "HIGH"]:
handle_critical_finding(log_entry, result)
log_queue.task_done()
except queue.Empty:
continue
except Exception as e:
logging.error(f"Analiz hatası: {e}")
def handle_critical_finding(log_entry: dict, analysis: dict):
"""Kritik bulgular için aksiyon al"""
timestamp = time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(log_entry["timestamp"]))
print(f"n{'='*60}")
print(f"[{analysis['severity']}] {timestamp}")
print(f"Kaynak: {log_entry['source']}")
print(f"Log: {log_entry['line'][:200]}")
print(f"Özet: {analysis.get('summary', 'N/A')}")
print(f"Kategori: {analysis.get('category', 'unknown')}")
if analysis.get('action_required'):
print(f"Önerilen Aksiyon: {analysis.get('suggested_action', 'N/A')}")
print(f"{'='*60}n")
# Buraya Slack/email bildirimi eklenebilir
if __name__ == "__main__":
# Worker thread'leri başlat (2 paralel analiz)
for i in range(2):
t = threading.Thread(target=analysis_worker, daemon=True)
t.start()
# Dosya izleyicileri başlat
observers = []
for log_path in WATCH_PATHS:
if os.path.exists(log_path):
handler = LogFileHandler(log_path)
observer = Observer()
observer.schedule(handler, os.path.dirname(log_path), recursive=False)
observer.start()
observers.append(observer)
print(f"İzleniyor: {log_path}")
print("nLog izleme başladı. Çıkmak için Ctrl+C...n")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
for obs in observers:
obs.stop()
obs.join()
PYEOF
Toplu Log Analizi ve Raporlama
Gerçek zamanlı izlemenin yanında, geçmiş log’ları toplu analiz etmek de çok faydalı. Özellikle bir olay sonrası post-mortem analizi için:
cat > /opt/localai/batch_analyzer.py << 'PYEOF'
#!/usr/bin/env python3
"""
Belirli bir zaman aralığındaki logları toplu analiz eder
ve özet rapor üretir
"""
import sys
import json
import requests
from collections import defaultdict
from datetime import datetime
LOCALAI_URL = "http://localhost:8080/v1/chat/completions"
def batch_analyze_logs(log_file: str, max_lines: int = 500):
"""Log dosyasını batch olarak analiz et"""
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()[-max_lines:] # Son N satırı al
# Satırları 20'lik gruplara böl (context için)
chunks = [lines[i:i+20] for i in range(0, len(lines), 20)]
all_findings = []
for idx, chunk in enumerate(chunks):
chunk_text = "".join(chunk)
print(f"Chunk {idx+1}/{len(chunks)} analiz ediliyor...", end='r')
prompt = f"""Aşağıdaki log bloğunu analiz et.
Dikkat çeken hataları, anomalileri ve güvenlik sorunlarını listele.
Her bulgu için şu formatı kullan:
[SEVERİTE] KATEGORI: Açıklama
Log bloğu:
{chunk_text[:2000]}
Sadece gerçek sorunları listele. Eğer sorun yoksa "Sorun tespit edilmedi" yaz."""
payload = {
"model": "mistral",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1,
"max_tokens": 400
}
try:
response = requests.post(LOCALAI_URL, json=payload, timeout=45)
content = response.json()["choices"][0]["message"]["content"]
if "Sorun tespit edilmedi" not in content:
all_findings.append({
"chunk": idx + 1,
"findings": content
})
except Exception as e:
print(f"nHata (chunk {idx+1}): {e}")
return all_findings
def generate_summary_report(findings: list) -> str:
"""Tüm bulgulardan özet rapor üret"""
if not findings:
return "Log analizi tamamlandı. Önemli sorun tespit edilmedi."
all_text = "n".join([f["findings"] for f in findings])
prompt = f"""Aşağıda bir log dosyasının analiz bulgularını görüyorsun.
Bu bulguları özetle ve şu başlıklarda bir rapor hazırla:
1. Kritik Sorunlar (Acil müdahale gerektiren)
2. Güvenlik Uyarıları
3. Performans Sorunları
4. Önerilen Aksiyonlar (öncelik sırasıyla)
Bulgular:
{all_text[:3000]}
Raporu Türkçe, profesyonel ve özlü şekilde yaz."""
payload = {
"model": "mistral",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.2,
"max_tokens": 600
}
response = requests.post(LOCALAI_URL, json=payload, timeout=60)
return response.json()["choices"][0]["message"]["content"]
if __name__ == "__main__":
log_file = sys.argv[1] if len(sys.argv) > 1 else "/var/log/syslog"
print(f"n{log_file} analiz ediliyor...n")
findings = batch_analyze_logs(log_file)
print(f"n{len(findings)} adet sorunlu blok tespit edildi.n")
print("Özet rapor hazırlanıyor...n")
report = generate_summary_report(findings)
print("=" * 60)
print("LOG ANALİZ RAPORU")
print("=" * 60)
print(report)
# Raporu dosyaya kaydet
report_file = f"/tmp/log_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
with open(report_file, 'w') as f:
f.write(report)
print(f"nRapor kaydedildi: {report_file}")
PYEOF
# Kullanım
python3 /opt/localai/batch_analyzer.py /var/log/syslog
Slack Bildirim Entegrasyonu
Kritik bulgular için anında bildirim almak istiyorsan:
cat > /opt/localai/notifier.py << 'PYEOF'
#!/usr/bin/env python3
import requests
import os
import socket
SLACK_WEBHOOK = os.getenv("SLACK_WEBHOOK_URL", "")
HOSTNAME = socket.gethostname()
def send_slack_alert(severity: str, category: str,
log_line: str, summary: str, action: str = ""):
"""Slack'e uyarı gönder"""
if not SLACK_WEBHOOK:
return
# Severity'e göre emoji ve renk belirle
severity_config = {
"CRITICAL": {"emoji": "🔴", "color": "#FF0000"},
"HIGH": {"emoji": "🟠", "color": "#FF8C00"},
"MEDIUM": {"emoji": "🟡", "color": "#FFD700"},
"LOW": {"emoji": "🔵", "color": "#4169E1"}
}
config = severity_config.get(severity, {"emoji": "⚪", "color": "#808080"})
blocks = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"{config['emoji']} [{severity}] Log Uyarısı - {HOSTNAME}"
}
},
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": f"*Kategori:*n{category}"},
{"type": "mrkdwn", "text": f"*Sunucu:*n{HOSTNAME}"}
]
},
{
"type": "section",
"text": {"type": "mrkdwn", "text": f"*Özet:*n{summary}"}
},
{
"type": "section",
"text": {"type": "mrkdwn", "text": f"*Log:*n```{log_line[:300]}```"}
}
]
if action:
blocks.append({
"type": "section",
"text": {"type": "mrkdwn", "text": f"*Önerilen Aksiyon:*n{action}"}
})
payload = {"blocks": blocks, "username": "LocalAI Log Analyzer"}
try:
requests.post(SLACK_WEBHOOK, json=payload, timeout=10)
except Exception as e:
print(f"Slack bildirimi gönderilemedi: {e}")
PYEOF
Systemd Servisi Olarak Çalıştırma
Bu sistemi bir servis olarak kuralım ki sunucu yeniden başladığında otomatik devreye girsin:
# Systemd service dosyası oluştur
cat > /etc/systemd/system/localai-log-analyzer.service << 'EOF'
[Unit]
Description=LocalAI Real-time Log Analyzer
After=network.target docker.service
Requires=docker.service
[Service]
Type=simple
User=root
WorkingDirectory=/opt/localai
Environment="SLACK_WEBHOOK_URL=https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
ExecStartPre=/bin/sleep 10
ExecStart=/usr/bin/python3 /opt/localai/log_watcher.py
Restart=on-failure
RestartSec=30
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
EOF
systemctl daemon-reload
systemctl enable localai-log-analyzer
systemctl start localai-log-analyzer
# Durum kontrol
systemctl status localai-log-analyzer
journalctl -u localai-log-analyzer -f
Performans Optimizasyonu ve Pratik İpuçları
Bu sistemi production’da çalıştırırken karşılaşabileceğin sorunlar ve çözümleri:
Model Seçimi Önemli Mistral 7B Q4_K_M quantization ile CPU üzerinde saniyede yaklaşık 3-8 token üretiyor. Çok yoğun log trafiğinde analiz kuyruğu birikebilir. Bunun için:
- Analiz worker sayısını artır (2’den 4’e)
- Düşük severity logları (INFO, DEBUG) filtreyle geç, sadece WARNING ve üstünü analiz et
- GPU varsa CUDA desteğiyle LocalAI’ı çalıştır, hız 10-20 kat artıyor
Akıllı Filtreleme Her log satırını AI’a göndermek verimsiz. Önce basit bir ön filtre yaz:
# Önceden bilinen gürültülü pattern'ları filtrele
NOISE_PATTERNS = [
r"systemd[d+]: Started Session",
r"CRON[d+]: (root) CMD",
r"kernel: audit:",
r"sshd.*Accepted publickey", # Başarılı SSH girişleri
r"UFW BLOCK.*b(80|443)b" # Rutin firewall bloğu
]
Bu pattern’larla eşleşen satırları doğrudan atla, AI’ya gönderme. Analiz yükü %60-70 oranında düşüyor.
Rate Limiting Aynı tip hatalar peş peşe geliyorsa her birini ayrı ayrı analiz etme. Aynı pattern’dan 5 dakika içinde 10’dan fazla gelirse tek bir toplu analiz yap:
- Sliding window ile duplicate detection ekle
- Son 5 dakikadaki benzer satırları grupla, bir kez gönder
- Analiz sonucunu cache’le, aynı pattern gelince cache’den dön
Memory Yönetimi LocalAI varsayılan olarak modeli sürekli bellekte tutuyor (iyi). Ama context boyutu konusunda dikkatli ol. 4096 token context yeterli, büyütmek memory kullanımını ciddi artırıyor. Log analizi için bu kadar context fazlasıyla yeterli.
Gerçek Dünya Senaryosu: Brute Force Tespiti
Sistemin nasıl çalıştığını somut bir örnekle görelim. Auth.log’dan gelen şu satırları düşün:
Dec 15 03:12:45 sshd[12345]: Failed password for root from 185.234.xx.xx
Dec 15 03:12:47 sshd[12346]: Failed password for root from 185.234.xx.xx
Dec 15 03:12:49 sshd[12347]: Failed password for admin from 185.234.xx.xx
Dec 15 03:12:51 sshd[12348]: Failed password for ubuntu from 185.234.xx.xx
Sistemimiz bu satırları kuyruğa alıyor, Mistral analiz ediyor ve şöyle bir sonuç üretiyor:
{
"severity": "CRITICAL",
"category": "security",
"summary": "185.234.xx.xx IP adresinden SSH brute force saldırısı",
"action_required": true,
"suggested_action": "iptables -A INPUT -s 185.234.xx.xx -j DROP komutuyla IP'yi engelle, fail2ban konfigürasyonunu kontrol et"
}
Slack’e bildirim gidiyor, sen sabah uyandığında durumu zaten biliyorsun ve IP zaten engellenmiş.
Sonuç
LocalAI tabanlı log analiz sistemi kurmanın önündeki en büyük engel “nasıl başlayacağım” sorusu. Gördüğün gibi aslında birkaç Python script’i ve bir Docker container ile oldukça güçlü bir sistem kurabiliyorsun. Tüm veriler sunucunda kalıyor, dışarıya tek byte bile çıkmıyor.
Bu sistemin en büyük değeri gece yarısı tetiklenen alarmları değil, sabah uyandığında sana hazır bir özet rapor sunması. “Gece ne oldu?” sorusunun cevabı artık 50.000 satır log okumak değil, 10 maddelik bir özet liste.
Tabii ki bu sistem perfect değil. LLM’ler zaman zaman yanlış sınıflandırma yapabiliyor, özellikle çok spesifik uygulama loglarında. Bu yüzden sistemini kural tabanlı araçların yerine değil, yanına koy. Elasticsearch veya Loki varsa onları kullanmaya devam et, LocalAI analizini bir katman üstüne ekle.
Bir sonraki adım olarak analiz sonuçlarını Prometheus metriklere çevirmeyi ve Grafana’da görselleştirmeyi düşünebilirsin. Ya da sistemi biraz daha geliştirip otomatik remediation ekleyebilirsin: tespit et, analiz et, ve önceden tanımlanmış aksiyonları otomatik uygula. Ama bu zaten başka bir yazının konusu.
