Kafka Consumer Lag Yönetimi: Gecikme Tespiti, Analizi ve Otomatik Ölçeklendirme Stratejileri

Production ortamında Kafka cluster’ı yönetirken en çok baş ağrıtan konulardan biri consumer lag’i kontrol altına almaktır. Bir sabah uyandığında lag’in milyonlara ulaştığını görmek, o günün tamamını berbat etmeye yeterlidir. Bu yazıda consumer lag’i nasıl tespit edeceğini, analiz edeceğini ve otomatik ölçeklendirme stratejileriyle nasıl yöneteceğini ele alacağız.

Consumer Lag Nedir ve Neden Önemlidir?

Consumer lag, Kafka topic’indeki son mesajın offset’i ile consumer’ın şu an işlediği offset arasındaki farktır. Basitçe söylemek gerekirse, consumer’ın producer’ın gerisinde ne kadar kaldığını gösterir.

Matematiksel olarak şöyle ifade edilir:

Lag = Log End Offset – Current Offset

Sıfır lag idealdir ama gerçek dünyada her zaman biraz lag olur. Sorun, bu lag’in kontrolsüzce büyümesidir. E-ticaret sistemlerinde sipariş işleme pipeline’ında lag büyüdüğünde müşteriler siparişlerinin durumunu göremez, fraud detection sistemleri geride kalır, bildirimler geç gider. Yani lag soyut bir metrik değil, doğrudan iş etkisi olan bir operasyonel sorundur.

Lag Tespiti: Temel Araçlar ve Komutlar

Kafka Native Araçlarla Lag Kontrolü

Kafka’nın kendi araçlarıyla hızlıca lag durumunu görmek için şu komutları kullanabilirsin:

# Tüm consumer group'larını listele
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

# Belirli bir consumer group'un detaylı lag bilgisi
kafka-consumer-groups.sh 
  --bootstrap-server localhost:9092 
  --group my-payment-consumer 
  --describe

# Tüm group'ların lag özetini göster
kafka-consumer-groups.sh 
  --bootstrap-server localhost:9092 
  --all-groups 
  --describe 2>/dev/null | awk 'NR==1 || /[0-9]+$/'

Bu komutun çıktısında şu kolonları göreceksin:

  • GROUP: Consumer group adı
  • TOPIC: İzlenen topic
  • PARTITION: Partition numarası
  • CURRENT-OFFSET: Consumer’ın son commit ettiği offset
  • LOG-END-OFFSET: Topic’teki son mesajın offset’i
  • LAG: İkisi arasındaki fark
  • CONSUMER-ID: Consumer instance kimliği
  • HOST: Consumer’ın çalıştığı sunucu
  • CLIENT-ID: Consumer client kimliği

Lag İzleme Script’i

Monitoring sistemine entegre edebileceğin basit ama etkili bir bash script’i:

#!/bin/bash
# kafka-lag-monitor.sh

BOOTSTRAP_SERVER="kafka1:9092,kafka2:9092,kafka3:9092"
ALERT_THRESHOLD=10000
LOG_FILE="/var/log/kafka-lag-monitor.log"

check_lag() {
    local group=$1
    local timestamp=$(date '+%Y-%m-%d %H:%M:%S')
    
    kafka-consumer-groups.sh 
        --bootstrap-server $BOOTSTRAP_SERVER 
        --group $group 
        --describe 2>/dev/null | 
        awk -v group="$group" -v ts="$timestamp" '
        NR>1 && $6 ~ /^[0-9]+$/ {
            lag = $6
            topic = $2
            partition = $3
            total_lag += lag
            if (lag > max_lag) {
                max_lag = lag
                max_partition = partition
            }
        }
        END {
            printf "%s | GROUP: %s | TOTAL_LAG: %d | MAX_PARTITION_LAG: %d (partition: %s)n",
                ts, group, total_lag, max_lag, max_partition
        }'
}

# Consumer group listesini al
GROUPS=$(kafka-consumer-groups.sh 
    --bootstrap-server $BOOTSTRAP_SERVER 
    --list 2>/dev/null)

for group in $GROUPS; do
    result=$(check_lag $group)
    echo $result | tee -a $LOG_FILE
    
    # Threshold kontrolü
    total=$(echo $result | grep -oP 'TOTAL_LAG: K[0-9]+')
    if [ -n "$total" ] && [ "$total" -gt "$ALERT_THRESHOLD" ]; then
        echo "ALERT: $group consumer group lag exceeded threshold! LAG: $total" | 
            mail -s "Kafka Consumer Lag Alert" [email protected]
    fi
done

Prometheus ve Grafana ile Lag Monitoring

Production ortamında bash script’lerle uğraşmak yerine proper monitoring stack kurman gerekiyor.

Kafka Exporter Kurulumu

# Docker ile kafka-exporter başlat
docker run -d 
    --name kafka-exporter 
    --restart unless-stopped 
    -p 9308:9308 
    danielqsj/kafka-exporter:latest 
    --kafka.server=kafka1:9092 
    --kafka.server=kafka2:9092 
    --kafka.server=kafka3:9092 
    --group.filter=".*-consumer$" 
    --topic.filter="^(orders|payments|notifications).*"

# Exporter'ın çalıştığını doğrula
curl -s http://localhost:9308/metrics | grep kafka_consumer_lag

Grafana Alert Kuralı

Kritik lag durumları için Grafana’da şu PromQL sorgularını kullanabilirsin:

# prometheus/rules/kafka-lag.yml
groups:
  - name: kafka_consumer_lag
    interval: 30s
    rules:
      - alert: KafkaConsumerLagHigh
        expr: |
          sum by (consumergroup, topic) (
            kafka_consumer_group_lag{consumergroup!=""}
          ) > 50000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Kafka consumer lag yüksek"
          description: "{{ $labels.consumergroup }} grubu {{ $labels.topic }} topic'inde {{ $value }} lag var"

      - alert: KafkaConsumerLagCritical
        expr: |
          sum by (consumergroup, topic) (
            kafka_consumer_group_lag{consumergroup!=""}
          ) > 500000
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "Kafka consumer lag kritik seviyede"
          description: "Acil müdahale gerekli: {{ $labels.consumergroup }}"

      - alert: KafkaConsumerGroupDead
        expr: |
          kafka_consumer_group_members{consumergroup!=""} == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Consumer group üyesi yok"

Lag Analizi: Kök Neden Tespiti

Lag tespit ettikten sonra asıl iş başlıyor: neden kaynaklandığını bulmak.

Consumer Performance Analizi

#!/bin/bash
# consumer-perf-analysis.sh
# Consumer'ın işlem hızını ölç ve darboğazı tespit et

BOOTSTRAP_SERVER="kafka1:9092"
TOPIC="orders"
GROUP="order-processor"
SAMPLE_INTERVAL=60  # saniye

echo "Consumer performans analizi başlıyor..."
echo "Topic: $TOPIC | Group: $GROUP | Süre: ${SAMPLE_INTERVAL}s"

# İlk ölçüm
OFFSET_START=$(kafka-consumer-groups.sh 
    --bootstrap-server $BOOTSTRAP_SERVER 
    --group $GROUP 
    --describe 2>/dev/null | 
    awk 'NR>1 && $6 ~ /^[0-9]+$/ {sum += $5} END {print sum}')

LAG_START=$(kafka-consumer-groups.sh 
    --bootstrap-server $BOOTSTRAP_SERVER 
    --group $GROUP 
    --describe 2>/dev/null | 
    awk 'NR>1 && $6 ~ /^[0-9]+$/ {sum += $6} END {print sum}')

echo "Başlangıç offset: $OFFSET_START | Başlangıç lag: $LAG_START"
sleep $SAMPLE_INTERVAL

# İkinci ölçüm
OFFSET_END=$(kafka-consumer-groups.sh 
    --bootstrap-server $BOOTSTRAP_SERVER 
    --group $GROUP 
    --describe 2>/dev/null | 
    awk 'NR>1 && $6 ~ /^[0-9]+$/ {sum += $5} END {print sum}')

LAG_END=$(kafka-consumer-groups.sh 
    --bootstrap-server $BOOTSTRAP_SERVER 
    --group $GROUP 
    --describe 2>/dev/null | 
    awk 'NR>1 && $6 ~ /^[0-9]+$/ {sum += $6} END {print sum}')

CONSUMED=$((OFFSET_END - OFFSET_START))
LAG_CHANGE=$((LAG_END - LAG_START))
RATE=$((CONSUMED / SAMPLE_INTERVAL))

echo "--- Analiz Sonucu ---"
echo "İşlenen mesaj: $CONSUMED"
echo "İşleme hızı: $RATE msg/s"
echo "Lag değişimi: $LAG_CHANGE (- azalıyor, + artıyor)"

if [ $LAG_CHANGE -gt 0 ]; then
    echo "UYARI: Consumer üretim hızının gerisinde kalıyor!"
    echo "Önerilen aksiyon: Consumer instance sayısını artır"
fi

Partition Bazlı Hot Spot Tespiti

Lag sorununun belirli partition’larda yoğunlaşması, partition dengesizliğine işaret eder:

# Partition bazlı lag dağılımını analiz et
kafka-consumer-groups.sh 
    --bootstrap-server localhost:9092 
    --group payment-processor 
    --describe 2>/dev/null | 
    awk 'NR>1 && $6 ~ /^[0-9]+$/ {
        printf "Partition %s: LAG=%s, CONSUMER=%sn", $3, $6, $7
    }' | sort -t= -k2 -rn | head -20

Eğer bazı partition’larda lag çok yüksek, bazılarında düşükse şu soruları sormalısın:

  • Partition key dağılımı adaletsiz mi? Belirli key’ler çok fazla mesaj üretiyorsa hot partition oluşur
  • Consumer sayısı partition sayısına eşit mi? Partition sayısından az consumer varsa bazı consumer’lar birden fazla partition işler
  • Consumer’lar arasında işlem süresi farkı var mı? Bazı mesajların işlenmesi daha uzun sürüyorsa o consumer geride kalır

Otomatik Ölçeklendirme Stratejileri

KEDA ile Kubernetes Üzerinde Otomatik Ölçeklendirme

KEDA (Kubernetes Event-driven Autoscaling), Kafka lag’ine göre pod sayısını otomatik olarak ayarlamanın en temiz yoludur:

# keda-kafka-scaledobject.yaml
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: order-consumer-scaler
  namespace: production
spec:
  scaleTargetRef:
    name: order-consumer-deployment
  pollingInterval: 15        # 15 saniyede bir kontrol et
  cooldownPeriod: 300        # Scale-down öncesi 5 dakika bekle
  minReplicaCount: 2         # Minimum 2 consumer her zaman çalışsın
  maxReplicaCount: 20        # Topic'teki partition sayısını geçme!
  triggers:
    - type: kafka
      metadata:
        bootstrapServers: kafka1:9092,kafka2:9092,kafka3:9092
        consumerGroup: order-processor
        topic: orders
        lagThreshold: "5000"        # Her consumer için hedef lag
        offsetResetPolicy: latest
      authenticationRef:
        name: kafka-trigger-auth
---
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
  name: kafka-trigger-auth
  namespace: production
spec:
  secretTargetRef:
    - parameter: sasl.username
      name: kafka-credentials
      key: username
    - parameter: sasl.password
      name: kafka-credentials
      key: password

Bu yapılandırmada lagThreshold: “5000” değeri kritiktir. KEDA, toplam lag’i bu değere bölerek gereken replica sayısını hesaplar. Toplam lag 50.000 ise ve threshold 5.000 ise, 10 consumer instance başlatır.

Custom HPA ile Lag Tabanlı Ölçeklendirme

KEDA kullanmıyorsan, Prometheus custom metrics üzerinden HPA kurabilirsin:

# kafka-hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: payment-consumer-hpa
  namespace: production
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: payment-consumer
  minReplicas: 3
  maxReplicas: 15
  metrics:
    - type: External
      external:
        metric:
          name: kafka_consumer_group_lag_sum
          selector:
            matchLabels:
              consumergroup: payment-processor
              topic: payments
        target:
          type: AverageValue
          averageValue: "3000"
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
        - type: Pods
          value: 3
          periodSeconds: 60
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
        - type: Pods
          value: 1
          periodSeconds: 120

behavior bloğundaki ayarlar çok önemli. Scale-up’ı hızlı ama scale-down’ı yavaş tutmak, lag spike’larına hızlı tepki verirken gereksiz pod terminate/restart döngüsünü önler.

Lag Düşürme Teknikleri

Ölçeklendirme yeterli olmadığında lag’i düşürmek için şu teknikleri uygulayabilirsin:

# Consumer group'u geçici olarak en son offset'e resetle
# DİKKAT: Mesaj kaybına yol açar, sadece replay kabul edilebilir durumlarda kullan
kafka-consumer-groups.sh 
    --bootstrap-server localhost:9092 
    --group notification-consumer 
    --topic user-notifications 
    --reset-offsets 
    --to-latest 
    --execute

# Belirli bir tarihe göre offset resetle (örn: son 1 saatin mesajlarını işle)
kafka-consumer-groups.sh 
    --bootstrap-server localhost:9092 
    --group analytics-consumer 
    --topic page-views 
    --reset-offsets 
    --to-datetime 2024-01-15T10:00:00.000 
    --execute

# Sadece belirli partition'ı resetle
kafka-consumer-groups.sh 
    --bootstrap-server localhost:9092 
    --group order-consumer 
    --topic orders:5 
    --reset-offsets 
    --to-offset 1250000 
    --execute

Gerçek Dünya Senaryosu: Flash Sale Lag Krizi

Bir e-ticaret platformunda çalışan arkadaşlardan duyduğum ve benzer şekilde yaşadığım bir senaryo: Flash sale başlıyor, 5 dakika içinde order topic’inde lag 0’dan 2 milyona çıkıyor. Normal consumer sayısı olan 5 pod bu yükü kaldıramıyor.

Bu tür öngörülebilir yük artışları için proaktif ölçeklendirme script’i:

#!/bin/bash
# pre-scale.sh - Flash sale öncesi consumer'ları önceden ölçeklendir

NAMESPACE="production"
DEPLOYMENT="order-consumer"
FLASH_SALE_REPLICAS=20
NORMAL_REPLICAS=5

scale_for_event() {
    local replicas=$1
    local reason=$2
    
    echo "[$(date)] $reason - Scaling $DEPLOYMENT to $replicas replicas"
    
    kubectl scale deployment/$DEPLOYMENT 
        --replicas=$replicas 
        -n $NAMESPACE
    
    # Tüm pod'ların ready olmasını bekle
    kubectl rollout status deployment/$DEPLOYMENT 
        -n $NAMESPACE 
        --timeout=120s
    
    echo "[$(date)] Scale tamamlandı. Mevcut pod sayısı:"
    kubectl get pods -n $NAMESPACE -l app=$DEPLOYMENT | grep Running | wc -l
}

# Flash sale başlamadan 10 dakika önce çağrılacak
case $1 in
    "prepare")
        scale_for_event $FLASH_SALE_REPLICAS "Flash sale hazırlığı"
        # KEDA'nın otomatik scale-down yapmasını geçici olarak durdur
        kubectl annotate scaledobject order-consumer-scaler 
            autoscaling.keda.sh/paused=true 
            -n $NAMESPACE
        ;;
    "restore")
        # Flash sale bitti, KEDA'yı tekrar devreye al
        kubectl annotate scaledobject order-consumer-scaler 
            autoscaling.keda.sh/paused- 
            -n $NAMESPACE
        echo "KEDA otomatik ölçeklendirme tekrar aktif"
        ;;
    *)
        echo "Kullanım: $0 {prepare|restore}"
        exit 1
        ;;
esac

Consumer Konfigürasyon Optimizasyonu

Bazen lag sorunu ölçeklendirmeyle değil, consumer konfigürasyonuyla çözülür:

# consumer.properties - Performans odaklı konfigürasyon

# Fetch boyutunu artır (varsayılan 1MB, bunu 50MB'a çıkar)
fetch.min.bytes=52428800
fetch.max.bytes=104857600
max.partition.fetch.bytes=10485760

# Tek poll'da daha fazla mesaj al
max.poll.records=1000

# Poll timeout'u artır (uzun işlemler için)
max.poll.interval.ms=600000

# Session timeout'u ayarla
session.timeout.ms=45000
heartbeat.interval.ms=15000

# Auto commit'i kapat, manuel commit kullan
enable.auto.commit=false

# Commit sıklığını artır
auto.commit.interval.ms=1000

Bu ayarları yaptıktan sonra consumer’ın mesajları ne kadar hızlı işlediğini ölçmek için şu komutu kullanabilirsin:

# Consumer performance test
kafka-consumer-perf-test.sh 
    --bootstrap-server localhost:9092 
    --topic orders 
    --group perf-test-group 
    --messages 100000 
    --threads 4 
    --show-detailed-stats 
    --reporting-interval 5000

Sonuç

Kafka consumer lag yönetimi tek bir araç veya tek bir stratejiyle çözülecek bir problem değil. Etkili bir lag yönetimi için şu katmanları bir arada kullanman gerekiyor:

  • Görünürlük katmanı: Prometheus + Grafana + kafka-exporter ile anlık ve tarihsel lag verisi
  • Uyarı katmanı: Lag threshold’larına göre çok seviyeli alerting (warning, critical)
  • Otomasyon katmanı: KEDA veya custom HPA ile lag’e tepkisel ölçeklendirme
  • Proaktif katman: Öngörülebilir yük artışları için zamanlanmış ölçeklendirme
  • Optimizasyon katmanı: Consumer konfigürasyonu ve kod seviyesinde iyileştirmeler

En önemli ders şu: Lag’i gördükten sonra müdahale etmek reaktiftir ve çoğu zaman geç kalırsın. İyi bir monitoring ile lag’in artış trendini erken tespit etmek, sorunu iş etkisi oluşmadan çözmeni sağlar. Production’da sıfır sürpriz, en güzel hedeftir.

Bir sonraki yazıda Kafka broker tarafındaki performans sorunlarını ve partition liderlik optimizasyonunu ele alacağız.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir