awk ile Log Rotasyonu Sonrası Parçalı Dosyalardan Birleşik Erişim İstatistiği Çıkarma

Log rotation çalışıyor, gece yarısı cron işi dosyayı çevirdi, ve sabah geldiğinizde dünün trafik raporunu çıkarmak istiyorsunuz. Ama elimizde access.log, access.log.1, access.log.2.gz ve belki birkaç tane daha var. İşte o an şunu düşünürsünüz: “Bunları tek tek mi işleyeceğim?” Hayır. awk ile bu işi temiz, hızlı ve tekrar kullanılabilir şekilde halledebilirsiniz.

Bu yazıda gerçek bir prodüksiyon ortamında karşılaşacağınız senaryoyu ele alacağız: Log rotation sonrası parçalanmış Apache ve Nginx log dosyalarından birleşik erişim istatistikleri çıkarmak. Sıkıştırılmış dosyalarla başa çıkmak, farklı log formatlarını normalize etmek, ve sonunda anlamlı bir rapor üretmek.

Log Rotation Sonrası Gerçeklik

Çoğu sistemde logrotate varsayılan yapılandırmayla şöyle bir düzen bırakır:

/var/log/nginx/access.log          # Bugün yazılan, aktif
/var/log/nginx/access.log.1        # Dün
/var/log/nginx/access.log.2.gz     # Evvelsi gün, sıkıştırılmış
/var/log/nginx/access.log.3.gz
/var/log/nginx/access.log.4.gz

Eğer dateext seçeneği açıksa durum biraz farklılaşır:

/var/log/nginx/access.log
/var/log/nginx/access.log-20241201
/var/log/nginx/access.log-20241130.gz

Her iki durumu da ele alacağız. Önce temel awk komutlarını hatırlayalım, sonra gerçek senaryolara geçelim.

Temel: Tüm Parçalı Dosyaları Birlikte İşlemek

Sıkıştırılmamış dosyaları doğrudan awk’a vermek kolaydır:

awk '{print $1}' /var/log/nginx/access.log /var/log/nginx/access.log.1 | sort | uniq -c | sort -rn | head -20

Ama sıkıştırılmış dosyalar için zcat veya zless devreye giriyor. Bunları birleştirmenin en temiz yolu zcat ile pipe kullanmak:

{ cat /var/log/nginx/access.log /var/log/nginx/access.log.1; 
  zcat /var/log/nginx/access.log.2.gz /var/log/nginx/access.log.3.gz; } 
  | awk '{print $1}' | sort | uniq -c | sort -rn | head -20

Bu yaklaşım çalışır ama bakımı zordur. Daha iyi bir yol, glob pattern ile otomatik dosya tespiti yapmak.

Dosya Tespiti ve Akıllı Birleştirme

Şu script hem sıkıştırılmış hem sıkıştırılmamış dosyaları otomatik tespit eder:

#!/bin/bash
# log_collector.sh
# Kullanım: ./log_collector.sh /var/log/nginx/access.log

LOG_BASE="${1:-/var/log/nginx/access.log}"
LOG_DIR=$(dirname "$LOG_BASE")
LOG_NAME=$(basename "$LOG_BASE")

# Tüm ilgili log dosyalarını bul, sırala
{
    # Düz metin dosyaları
    for f in "$LOG_DIR"/"$LOG_NAME"* ; do
        [[ "$f" == *.gz ]] && continue
        [[ -f "$f" ]] && cat "$f"
    done

    # Sıkıştırılmış dosyalar
    for f in $(ls -t "$LOG_DIR"/"$LOG_NAME"*.gz 2>/dev/null); do
        zcat "$f"
    done
} | awk '
{
    # Nginx combined format: IP - user [tarih] "metod url protokol" status boyut "referer" "ua"
    if (NF >= 7) print $0
}
'

Bu yaklaşımı beğeniyorum çünkü dosya sayısından bağımsız çalışıyor. Yeni bir .gz dosyası eklendiğinde script’i güncellemenize gerek yok.

Apache ve Nginx Log Formatlarını Anlamak

Apache combined format ile Nginx default combined format neredeyse aynıdır:

192.168.1.1 - frank [10/Oct/2024:13:55:36 -0700] "GET /index.html HTTP/1.1" 200 2326 "http://example.com/" "Mozilla/5.0..."

Alan numaraları:

  • $1: IP adresi
  • $2: ident (genelde “-“)
  • $3: auth user (genelde “-“)
  • $4: tarih (köşeli parantez içinde, iki alan)
  • $7: HTTP metodu
  • $8: istek URL’i (tırnak içinde)
  • $9: HTTP versiyonu
  • $10 (veya farklı): HTTP status kodu

Köşeli parantez ve tırnak işaretleri alan ayrıştırmayı karmaşıklaştırır. Bunun için FS (field separator) ayarlamak yerine regex kullanmak daha güvenlidir:

awk '
{
    # Regex ile log satırını parçala
    if (match($0, /[([^]]+)]/, tarih_arr) &&
        match($0, /"([A-Z]+) ([^ ]+) HTTP/[0-9.]+"/,  istek_arr) &&
        match($0, /" ([0-9]{3}) /, status_arr)) {

        ip      = $1
        tarih   = tarih_arr[1]
        metod   = istek_arr[1]
        url     = istek_arr[2]
        status  = status_arr[1]

        print ip, tarih, metod, url, status
    }
}
' /var/log/nginx/access.log | head -5

match() fonksiyonunun üç argümanlı versiyonu (GNU awk’a özgü) capture group’ları doğrudan diziye aktarır. Eğer POSIX awk kullanıyorsanız bu çalışmaz, ama üretim sunucularının büyük çoğunluğunda gawk yüklüdür.

Gerçek Dünya Senaryosu 1: Saatlik Trafik Dağılımı

Bir e-ticaret sitesi yöneticisi olduğunuzu düşünün. Son 7 günün saatlik trafik dağılımını görmeniz gerekiyor, hangi saatlerde sunucunun zorlandığını anlamak için:

#!/bin/bash
# hourly_stats.sh

LOG_DIR="/var/log/nginx"
LOG_PATTERN="access.log"

{
    cat "$LOG_DIR/$LOG_PATTERN" 2>/dev/null
    cat "$LOG_DIR/$LOG_PATTERN".* 2>/dev/null | grep -v ".gz" || true
    ls "$LOG_DIR/$LOG_PATTERN"*.gz 2>/dev/null | xargs zcat 2>/dev/null || true
} | gawk '
{
    # Tarih alanını çıkar: [10/Oct/2024:13:55:36
    if (match($0, /[([0-9]{2}/[A-Za-z]{3}/[0-9]{4}):([0-9]{2})/, arr)) {
        saat = arr[2]
        istek_sayisi[saat]++
    }
}
END {
    print "Saat | İstek Sayısı | Görsel"
    print "-----|-------------|-------"
    for (s = 0; s < 24; s++) {
        saat_str = sprintf("%02d", s)
        sayi = istek_sayisi[saat_str] + 0

        # Basit bar chart
        bar = ""
        bar_uzunluk = int(sayi / 100)
        for (i = 0; i < bar_uzunluk && i < 50; i++) bar = bar "#"

        printf "%s:00 | %8d | %sn", saat_str, sayi, bar
    }
}
' | column -t -s '|'

Bu script size terminal üzerinde ASCII bar chart ile saatlik dağılım verir. Sabah stand-up’ında hızlıca paylaşabileceğiniz bir çıktı.

Gerçek Dünya Senaryosu 2: Status Kodu Analizi ve Hata Oranı

500 hatalarının spike yaptığı bir dönemin analizini yapmak istiyorsunuz:

gawk '
BEGIN {
    toplam = 0
    ay_cevirici["Jan"] = "01"; ay_cevirici["Feb"] = "02"
    ay_cevirici["Mar"] = "03"; ay_cevirici["Apr"] = "04"
    ay_cevirici["May"] = "05"; ay_cevirici["Jun"] = "06"
    ay_cevirici["Jul"] = "07"; ay_cevirici["Aug"] = "08"
    ay_cevirici["Sep"] = "09"; ay_cevirici["Oct"] = "10"
    ay_cevirici["Nov"] = "11"; ay_cevirici["Dec"] = "12"
}
{
    if (match($0, /[([0-9]{2})/([A-Za-z]{3})/([0-9]{4}):([0-9]{2})/, arr)) {
        gun  = arr[1]
        ay   = ay_cevirici[arr[2]]
        yil  = arr[3]
        saat = arr[4]
        tarih_key = yil "-" ay "-" gun " " saat ":00"
    } else {
        next
    }

    # Status kodunu çıkar
    if (match($0, /" ([1-5][0-9]{2}) /, s_arr)) {
        status = s_arr[1]
        istek_sayisi[tarih_key]++
        status_sayisi[tarih_key][status]++
        toplam++

        if (status ~ /^5/) {
            hata_5xx[tarih_key]++
        } else if (status ~ /^4/) {
            hata_4xx[tarih_key]++
        }
    }
}
END {
    printf "%-20s %8s %8s %8s %8sn", "Zaman", "Toplam", "5xx", "4xx", "Hata%"
    printf "%sn", "------------------------------------------------------------------------"

    n = asorti(istek_sayisi, sirali_tarihler)
    for (i = 1; i <= n; i++) {
        t = sirali_tarihler[i]
        toplam_t  = istek_sayisi[t]
        hata5_t   = hata_5xx[t] + 0
        hata4_t   = hata_4xx[t] + 0
        hata_oran = (toplam_t > 0) ? ((hata5_t + hata4_t) / toplam_t * 100) : 0

        isaret = (hata_oran > 5) ? " <<< YUKSEK" : ""
        printf "%-20s %8d %8d %8d %7.2f%%%sn", t, toplam_t, hata5_t, hata4_t, hata_oran, isaret
    }
}
' /var/log/nginx/access.log.1

Bu çıktı size hangi saatte hata oranının kritik eşiği geçtiğini gösterir. <<< YUKSEK işaretini görünce o zaman dilimine odaklanırsınız.

Gerçek Dünya Senaryosu 3: IP Bazlı Şüpheli Trafik Tespiti

Birden fazla log dosyasından kümülatif olarak aynı IP’nin kaç istek attığını analiz etmek, DDoS veya scraping tespiti için önemlidir:

#!/bin/bash
# suspicious_ips.sh
# Eşik değeri: dakikada 100'den fazla istek

ESIK=100
LOG_FILES=("$@")

if [ ${#LOG_FILES[@]} -eq 0 ]; then
    LOG_FILES=(/var/log/nginx/access.log /var/log/nginx/access.log.1)
fi

{
    for f in "${LOG_FILES[@]}"; do
        if [[ "$f" == *.gz ]]; then
            zcat "$f"
        else
            cat "$f"
        fi
    done
} | gawk -v esik="$ESIK" '
{
    ip = $1
    if (match($0, /[([0-9]{2}/[A-Za-z]{3}/[0-9]{4}):([0-9]{2}):([0-9]{2})/, arr)) {
        # Dakika bazlı key
        dakika_key = ip "###" arr[1] ":" arr[2] ":" arr[3]
        ip_dakika[dakika_key]++
        ip_toplam[ip]++
    }
}
END {
    print "Şüpheli IP Raporu"
    print "=================="
    print ""

    for (key in ip_dakika) {
        if (ip_dakika[key] >= esik) {
            n = split(key, parcalar, "###")
            ip_addr = parcalar[1]
            zaman   = parcalar[2]
            sudphe_sayisi[ip_addr]++
            if (ip_dakika[key] > en_yuksek[ip_addr]) {
                en_yuksek[ip_addr] = ip_dakika[key]
                en_yuksek_zaman[ip_addr] = zaman
            }
        }
    }

    for (ip_addr in sudphe_sayisi) {
        printf "IP: %-18s | Toplam İstek: %6d | Pik: %4d/dk (%s) | Şüpheli Dakika: %dn",
            ip_addr,
            ip_toplam[ip_addr],
            en_yuksek[ip_addr],
            en_yuksek_zaman[ip_addr],
            sudphe_sayisi[ip_addr]
    }
}
' | sort -t'|' -k2 -rn

Bu script’i cron’a ekleyip çıktısını mail ile gönderebilirsiniz. Prodüksiyonda çalışan bir versiyonudur bunun.

Birden Fazla Sanal Host’tan Gelen Logları Birleştirmek

Eğer Nginx’inizde access_log direktifini her server bloğu için ayrı ayarladıysanız, birden fazla site’ın loglarını birleştirmeniz gerekebilir:

#!/bin/bash
# multi_vhost_stats.sh

LOG_DIZINI="/var/log/nginx"

# Tüm access log dosyalarını bul
find "$LOG_DIZINI" -name "access*.log*" | while read -r dosya; do
    if [[ "$dosya" == *.gz ]]; then
        zcat "$dosya"
    else
        cat "$dosya"
    fi
done | gawk '
{
    # URL path analizi - sorgu string olmadan
    if (match($0, /"[A-Z]+ ([^ ?]+)[^"]*" ([0-9]{3})/, arr)) {
        url    = arr[1]
        status = arr[2]

        # Sadece 2xx ve 3xx
        if (status ~ /^[23]/) {
            # URL'yi normalize et: /urun/123 -> /urun/ID gibi
            gsub(//[0-9]+/, "/NUM", url)
            url_hit[url]++
        }
    }
}
END {
    n = asorti(url_hit, sirali, "@val_num_desc")
    print "En Çok Ziyaret Edilen Endpoint'ler (Normalize Edilmiş)"
    print "========================================================="
    for (i = 1; i <= n && i <= 30; i++) {
        printf "%6d  %sn", url_hit[sirali[i]], sirali[i]
    }
}
'

gsub(//[0-9]+/, "/NUM", url) satırı önemli: /urun/123, /urun/456, /urun/789 gibi binlerce farklı URL’yi /urun/NUM olarak normalleştirir. Böylece anlamlı istatistik elde edersiniz, sayısal ID’lerin çok olduğu API loglarında bu olmadan anlamsız bir liste alırsınız.

Performans: Büyük Log Dosyalarında awk’ı Hızlandırmak

Birkaç GB’lık log dosyalarında awk yavaş çalışabilir. Birkaç pratik ipucu:

Gereksiz alanları işleme: Sadece ihtiyacınız olan alanları kullanın. print $0 yerine print $1, $9 gibi.

Erken eleme ile satır sayısını azalt:

# Önce grep ile filtrele, sonra awk ile işle
grep " 500 | 502 | 503 " /var/log/nginx/access.log | 
gawk '{print $1, $7}' | sort | uniq -c | sort -rn

Paralel işleme ile birden fazla dosyayı eş zamanlı işleyin:

#!/bin/bash
# parallel_log_process.sh

process_file() {
    local dosya="$1"
    if [[ "$dosya" == *.gz ]]; then
        zcat "$dosya"
    else
        cat "$dosya"
    fi | awk '{print $1}' | sort | uniq -c
}

export -f process_file

# GNU parallel varsa
if command -v parallel &>/dev/null; then
    find /var/log/nginx -name "access.log*" | 
        parallel process_file {} | 
        awk '{ip[$2] += $1} END {for (i in ip) print ip[i], i}' | 
        sort -rn | head -20
else
    # Parallel yoksa sıralı çalıştır
    find /var/log/nginx -name "access.log*" | while read -r f; do
        process_file "$f"
    done | awk '{ip[$2] += $1} END {for (i in ip) print ip[i], i}' | 
        sort -rn | head -20
fi

Bu script, parallel yüklüyse dosyaları eş zamanlı işler, yoksa sıralı çalışır. Her iki durumda da aynı çıktıyı üretir.

Sık Yapılan Hatalar ve Çözümleri

Sorun: Sıkıştırılmış dosyada tarih alanını yanlış okumak.

zcat çıktısını pipe ile awk’a verirken FILENAME değişkeni boş gelir. Hangi dosyadan geldiğini takip etmeniz gerekiyorsa:

for dosya in /var/log/nginx/access.log*.gz; do
    zcat "$dosya" | gawk -v kaynak="$dosya" '
    {
        print kaynak, $1, $9
    }
    '
done

Sorun: Farklı timezone’daki logları birleştirmek.

Birden fazla sunucudan gelen loglar farklı timezone ile yazılmış olabilir. logrotate ile merkezi bir log sunucusuna gönderiyorsanız bu problem çıkar. Bu durumda UTC’ye normalize eden bir ön işlem adımı eklemek gerekir, ama bu ayrı bir yazı konusu.

Sorun: Büyük log dosyalarında gawk bellek sorunu.

Çok sayıda benzersiz key’i dizide tutarsanız bellek dolabilir. Çözüm: ara sonuçları geçici dosyaya yazmak veya sort | uniq kombinasyonunu dizi yerine kullanmak.

# Dizi yerine sort|uniq pipeline - bellek dostu
{ cat /var/log/nginx/access.log; zcat /var/log/nginx/access.log.*.gz; } | 
    awk '{print $1}' | 
    sort -S 50% | 
    uniq -c | 
    sort -rn | 
    head -50

sort -S 50% ile mevcut RAM’in yüzde ellisini sort işlemine ayırıyorsunuz, bu büyük dosyalarda performansı belirgin şekilde artırır.

Sonuç

Log rotation’ın parçalara böldüğü dosyalardan anlamlı istatistik çıkarmak, başta karmaşık görünse de awk’ın gücüyle oldukça yönetilebilir bir hal alıyor. Özetlemek gerekirse:

  • Sıkıştırılmış ve sıkıştırılmamış dosyaları {cat ...; zcat ...;} bloğuyla birleştirin
  • Regex ile log formatını güvenilir şekilde ayrıştırın, alan numaralarına körü körüne güvenmeyin
  • URL’leri normalleştirin, yoksa sayısal ID’ler istatistiği anlamsızlaştırır
  • Büyük dosyalarda grep ile ön filtreleme yapın, awk’a daha az satır gönderin
  • parallel kullanabiliyorsanız dosyaları paralel işleyin

Bu script’leri /usr/local/bin/ altına atıp cron’a bağladığınızda, sabah geldiğinizde raporların sizi beklediğini göreceksiniz. Logları elle karıştırmak yerine awk’ın sizin için yaptığı işe güvenip daha önemli sorunlara odaklanabilirsiniz.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir