Kafka Consumer Lag Yönetimi: Gecikme Tespiti, Analizi ve Otomatik Ölçeklendirme Stratejileri
Production ortamında Kafka cluster’ı yönetirken en çok baş ağrıtan konulardan biri consumer lag’i kontrol altına almaktır. Bir sabah uyandığında lag’in milyonlara ulaştığını görmek, o günün tamamını berbat etmeye yeterlidir. Bu yazıda consumer lag’i nasıl tespit edeceğini, analiz edeceğini ve otomatik ölçeklendirme stratejileriyle nasıl yöneteceğini ele alacağız.
Consumer Lag Nedir ve Neden Önemlidir?
Consumer lag, Kafka topic’indeki son mesajın offset’i ile consumer’ın şu an işlediği offset arasındaki farktır. Basitçe söylemek gerekirse, consumer’ın producer’ın gerisinde ne kadar kaldığını gösterir.
Matematiksel olarak şöyle ifade edilir:
Lag = Log End Offset – Current Offset
Sıfır lag idealdir ama gerçek dünyada her zaman biraz lag olur. Sorun, bu lag’in kontrolsüzce büyümesidir. E-ticaret sistemlerinde sipariş işleme pipeline’ında lag büyüdüğünde müşteriler siparişlerinin durumunu göremez, fraud detection sistemleri geride kalır, bildirimler geç gider. Yani lag soyut bir metrik değil, doğrudan iş etkisi olan bir operasyonel sorundur.
Lag Tespiti: Temel Araçlar ve Komutlar
Kafka Native Araçlarla Lag Kontrolü
Kafka’nın kendi araçlarıyla hızlıca lag durumunu görmek için şu komutları kullanabilirsin:
# Tüm consumer group'larını listele
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# Belirli bir consumer group'un detaylı lag bilgisi
kafka-consumer-groups.sh
--bootstrap-server localhost:9092
--group my-payment-consumer
--describe
# Tüm group'ların lag özetini göster
kafka-consumer-groups.sh
--bootstrap-server localhost:9092
--all-groups
--describe 2>/dev/null | awk 'NR==1 || /[0-9]+$/'
Bu komutun çıktısında şu kolonları göreceksin:
- GROUP: Consumer group adı
- TOPIC: İzlenen topic
- PARTITION: Partition numarası
- CURRENT-OFFSET: Consumer’ın son commit ettiği offset
- LOG-END-OFFSET: Topic’teki son mesajın offset’i
- LAG: İkisi arasındaki fark
- CONSUMER-ID: Consumer instance kimliği
- HOST: Consumer’ın çalıştığı sunucu
- CLIENT-ID: Consumer client kimliği
Lag İzleme Script’i
Monitoring sistemine entegre edebileceğin basit ama etkili bir bash script’i:
#!/bin/bash
# kafka-lag-monitor.sh
BOOTSTRAP_SERVER="kafka1:9092,kafka2:9092,kafka3:9092"
ALERT_THRESHOLD=10000
LOG_FILE="/var/log/kafka-lag-monitor.log"
check_lag() {
local group=$1
local timestamp=$(date '+%Y-%m-%d %H:%M:%S')
kafka-consumer-groups.sh
--bootstrap-server $BOOTSTRAP_SERVER
--group $group
--describe 2>/dev/null |
awk -v group="$group" -v ts="$timestamp" '
NR>1 && $6 ~ /^[0-9]+$/ {
lag = $6
topic = $2
partition = $3
total_lag += lag
if (lag > max_lag) {
max_lag = lag
max_partition = partition
}
}
END {
printf "%s | GROUP: %s | TOTAL_LAG: %d | MAX_PARTITION_LAG: %d (partition: %s)n",
ts, group, total_lag, max_lag, max_partition
}'
}
# Consumer group listesini al
GROUPS=$(kafka-consumer-groups.sh
--bootstrap-server $BOOTSTRAP_SERVER
--list 2>/dev/null)
for group in $GROUPS; do
result=$(check_lag $group)
echo $result | tee -a $LOG_FILE
# Threshold kontrolü
total=$(echo $result | grep -oP 'TOTAL_LAG: K[0-9]+')
if [ -n "$total" ] && [ "$total" -gt "$ALERT_THRESHOLD" ]; then
echo "ALERT: $group consumer group lag exceeded threshold! LAG: $total" |
mail -s "Kafka Consumer Lag Alert" [email protected]
fi
done
Prometheus ve Grafana ile Lag Monitoring
Production ortamında bash script’lerle uğraşmak yerine proper monitoring stack kurman gerekiyor.
Kafka Exporter Kurulumu
# Docker ile kafka-exporter başlat
docker run -d
--name kafka-exporter
--restart unless-stopped
-p 9308:9308
danielqsj/kafka-exporter:latest
--kafka.server=kafka1:9092
--kafka.server=kafka2:9092
--kafka.server=kafka3:9092
--group.filter=".*-consumer$"
--topic.filter="^(orders|payments|notifications).*"
# Exporter'ın çalıştığını doğrula
curl -s http://localhost:9308/metrics | grep kafka_consumer_lag
Grafana Alert Kuralı
Kritik lag durumları için Grafana’da şu PromQL sorgularını kullanabilirsin:
# prometheus/rules/kafka-lag.yml
groups:
- name: kafka_consumer_lag
interval: 30s
rules:
- alert: KafkaConsumerLagHigh
expr: |
sum by (consumergroup, topic) (
kafka_consumer_group_lag{consumergroup!=""}
) > 50000
for: 5m
labels:
severity: warning
annotations:
summary: "Kafka consumer lag yüksek"
description: "{{ $labels.consumergroup }} grubu {{ $labels.topic }} topic'inde {{ $value }} lag var"
- alert: KafkaConsumerLagCritical
expr: |
sum by (consumergroup, topic) (
kafka_consumer_group_lag{consumergroup!=""}
) > 500000
for: 2m
labels:
severity: critical
annotations:
summary: "Kafka consumer lag kritik seviyede"
description: "Acil müdahale gerekli: {{ $labels.consumergroup }}"
- alert: KafkaConsumerGroupDead
expr: |
kafka_consumer_group_members{consumergroup!=""} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Consumer group üyesi yok"
Lag Analizi: Kök Neden Tespiti
Lag tespit ettikten sonra asıl iş başlıyor: neden kaynaklandığını bulmak.
Consumer Performance Analizi
#!/bin/bash
# consumer-perf-analysis.sh
# Consumer'ın işlem hızını ölç ve darboğazı tespit et
BOOTSTRAP_SERVER="kafka1:9092"
TOPIC="orders"
GROUP="order-processor"
SAMPLE_INTERVAL=60 # saniye
echo "Consumer performans analizi başlıyor..."
echo "Topic: $TOPIC | Group: $GROUP | Süre: ${SAMPLE_INTERVAL}s"
# İlk ölçüm
OFFSET_START=$(kafka-consumer-groups.sh
--bootstrap-server $BOOTSTRAP_SERVER
--group $GROUP
--describe 2>/dev/null |
awk 'NR>1 && $6 ~ /^[0-9]+$/ {sum += $5} END {print sum}')
LAG_START=$(kafka-consumer-groups.sh
--bootstrap-server $BOOTSTRAP_SERVER
--group $GROUP
--describe 2>/dev/null |
awk 'NR>1 && $6 ~ /^[0-9]+$/ {sum += $6} END {print sum}')
echo "Başlangıç offset: $OFFSET_START | Başlangıç lag: $LAG_START"
sleep $SAMPLE_INTERVAL
# İkinci ölçüm
OFFSET_END=$(kafka-consumer-groups.sh
--bootstrap-server $BOOTSTRAP_SERVER
--group $GROUP
--describe 2>/dev/null |
awk 'NR>1 && $6 ~ /^[0-9]+$/ {sum += $5} END {print sum}')
LAG_END=$(kafka-consumer-groups.sh
--bootstrap-server $BOOTSTRAP_SERVER
--group $GROUP
--describe 2>/dev/null |
awk 'NR>1 && $6 ~ /^[0-9]+$/ {sum += $6} END {print sum}')
CONSUMED=$((OFFSET_END - OFFSET_START))
LAG_CHANGE=$((LAG_END - LAG_START))
RATE=$((CONSUMED / SAMPLE_INTERVAL))
echo "--- Analiz Sonucu ---"
echo "İşlenen mesaj: $CONSUMED"
echo "İşleme hızı: $RATE msg/s"
echo "Lag değişimi: $LAG_CHANGE (- azalıyor, + artıyor)"
if [ $LAG_CHANGE -gt 0 ]; then
echo "UYARI: Consumer üretim hızının gerisinde kalıyor!"
echo "Önerilen aksiyon: Consumer instance sayısını artır"
fi
Partition Bazlı Hot Spot Tespiti
Lag sorununun belirli partition’larda yoğunlaşması, partition dengesizliğine işaret eder:
# Partition bazlı lag dağılımını analiz et
kafka-consumer-groups.sh
--bootstrap-server localhost:9092
--group payment-processor
--describe 2>/dev/null |
awk 'NR>1 && $6 ~ /^[0-9]+$/ {
printf "Partition %s: LAG=%s, CONSUMER=%sn", $3, $6, $7
}' | sort -t= -k2 -rn | head -20
Eğer bazı partition’larda lag çok yüksek, bazılarında düşükse şu soruları sormalısın:
- Partition key dağılımı adaletsiz mi? Belirli key’ler çok fazla mesaj üretiyorsa hot partition oluşur
- Consumer sayısı partition sayısına eşit mi? Partition sayısından az consumer varsa bazı consumer’lar birden fazla partition işler
- Consumer’lar arasında işlem süresi farkı var mı? Bazı mesajların işlenmesi daha uzun sürüyorsa o consumer geride kalır
Otomatik Ölçeklendirme Stratejileri
KEDA ile Kubernetes Üzerinde Otomatik Ölçeklendirme
KEDA (Kubernetes Event-driven Autoscaling), Kafka lag’ine göre pod sayısını otomatik olarak ayarlamanın en temiz yoludur:
# keda-kafka-scaledobject.yaml
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: order-consumer-scaler
namespace: production
spec:
scaleTargetRef:
name: order-consumer-deployment
pollingInterval: 15 # 15 saniyede bir kontrol et
cooldownPeriod: 300 # Scale-down öncesi 5 dakika bekle
minReplicaCount: 2 # Minimum 2 consumer her zaman çalışsın
maxReplicaCount: 20 # Topic'teki partition sayısını geçme!
triggers:
- type: kafka
metadata:
bootstrapServers: kafka1:9092,kafka2:9092,kafka3:9092
consumerGroup: order-processor
topic: orders
lagThreshold: "5000" # Her consumer için hedef lag
offsetResetPolicy: latest
authenticationRef:
name: kafka-trigger-auth
---
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
name: kafka-trigger-auth
namespace: production
spec:
secretTargetRef:
- parameter: sasl.username
name: kafka-credentials
key: username
- parameter: sasl.password
name: kafka-credentials
key: password
Bu yapılandırmada lagThreshold: “5000” değeri kritiktir. KEDA, toplam lag’i bu değere bölerek gereken replica sayısını hesaplar. Toplam lag 50.000 ise ve threshold 5.000 ise, 10 consumer instance başlatır.
Custom HPA ile Lag Tabanlı Ölçeklendirme
KEDA kullanmıyorsan, Prometheus custom metrics üzerinden HPA kurabilirsin:
# kafka-hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: payment-consumer-hpa
namespace: production
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: payment-consumer
minReplicas: 3
maxReplicas: 15
metrics:
- type: External
external:
metric:
name: kafka_consumer_group_lag_sum
selector:
matchLabels:
consumergroup: payment-processor
topic: payments
target:
type: AverageValue
averageValue: "3000"
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Pods
value: 3
periodSeconds: 60
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Pods
value: 1
periodSeconds: 120
behavior bloğundaki ayarlar çok önemli. Scale-up’ı hızlı ama scale-down’ı yavaş tutmak, lag spike’larına hızlı tepki verirken gereksiz pod terminate/restart döngüsünü önler.
Lag Düşürme Teknikleri
Ölçeklendirme yeterli olmadığında lag’i düşürmek için şu teknikleri uygulayabilirsin:
# Consumer group'u geçici olarak en son offset'e resetle
# DİKKAT: Mesaj kaybına yol açar, sadece replay kabul edilebilir durumlarda kullan
kafka-consumer-groups.sh
--bootstrap-server localhost:9092
--group notification-consumer
--topic user-notifications
--reset-offsets
--to-latest
--execute
# Belirli bir tarihe göre offset resetle (örn: son 1 saatin mesajlarını işle)
kafka-consumer-groups.sh
--bootstrap-server localhost:9092
--group analytics-consumer
--topic page-views
--reset-offsets
--to-datetime 2024-01-15T10:00:00.000
--execute
# Sadece belirli partition'ı resetle
kafka-consumer-groups.sh
--bootstrap-server localhost:9092
--group order-consumer
--topic orders:5
--reset-offsets
--to-offset 1250000
--execute
Gerçek Dünya Senaryosu: Flash Sale Lag Krizi
Bir e-ticaret platformunda çalışan arkadaşlardan duyduğum ve benzer şekilde yaşadığım bir senaryo: Flash sale başlıyor, 5 dakika içinde order topic’inde lag 0’dan 2 milyona çıkıyor. Normal consumer sayısı olan 5 pod bu yükü kaldıramıyor.
Bu tür öngörülebilir yük artışları için proaktif ölçeklendirme script’i:
#!/bin/bash
# pre-scale.sh - Flash sale öncesi consumer'ları önceden ölçeklendir
NAMESPACE="production"
DEPLOYMENT="order-consumer"
FLASH_SALE_REPLICAS=20
NORMAL_REPLICAS=5
scale_for_event() {
local replicas=$1
local reason=$2
echo "[$(date)] $reason - Scaling $DEPLOYMENT to $replicas replicas"
kubectl scale deployment/$DEPLOYMENT
--replicas=$replicas
-n $NAMESPACE
# Tüm pod'ların ready olmasını bekle
kubectl rollout status deployment/$DEPLOYMENT
-n $NAMESPACE
--timeout=120s
echo "[$(date)] Scale tamamlandı. Mevcut pod sayısı:"
kubectl get pods -n $NAMESPACE -l app=$DEPLOYMENT | grep Running | wc -l
}
# Flash sale başlamadan 10 dakika önce çağrılacak
case $1 in
"prepare")
scale_for_event $FLASH_SALE_REPLICAS "Flash sale hazırlığı"
# KEDA'nın otomatik scale-down yapmasını geçici olarak durdur
kubectl annotate scaledobject order-consumer-scaler
autoscaling.keda.sh/paused=true
-n $NAMESPACE
;;
"restore")
# Flash sale bitti, KEDA'yı tekrar devreye al
kubectl annotate scaledobject order-consumer-scaler
autoscaling.keda.sh/paused-
-n $NAMESPACE
echo "KEDA otomatik ölçeklendirme tekrar aktif"
;;
*)
echo "Kullanım: $0 {prepare|restore}"
exit 1
;;
esac
Consumer Konfigürasyon Optimizasyonu
Bazen lag sorunu ölçeklendirmeyle değil, consumer konfigürasyonuyla çözülür:
# consumer.properties - Performans odaklı konfigürasyon
# Fetch boyutunu artır (varsayılan 1MB, bunu 50MB'a çıkar)
fetch.min.bytes=52428800
fetch.max.bytes=104857600
max.partition.fetch.bytes=10485760
# Tek poll'da daha fazla mesaj al
max.poll.records=1000
# Poll timeout'u artır (uzun işlemler için)
max.poll.interval.ms=600000
# Session timeout'u ayarla
session.timeout.ms=45000
heartbeat.interval.ms=15000
# Auto commit'i kapat, manuel commit kullan
enable.auto.commit=false
# Commit sıklığını artır
auto.commit.interval.ms=1000
Bu ayarları yaptıktan sonra consumer’ın mesajları ne kadar hızlı işlediğini ölçmek için şu komutu kullanabilirsin:
# Consumer performance test
kafka-consumer-perf-test.sh
--bootstrap-server localhost:9092
--topic orders
--group perf-test-group
--messages 100000
--threads 4
--show-detailed-stats
--reporting-interval 5000
Sonuç
Kafka consumer lag yönetimi tek bir araç veya tek bir stratejiyle çözülecek bir problem değil. Etkili bir lag yönetimi için şu katmanları bir arada kullanman gerekiyor:
- Görünürlük katmanı: Prometheus + Grafana + kafka-exporter ile anlık ve tarihsel lag verisi
- Uyarı katmanı: Lag threshold’larına göre çok seviyeli alerting (warning, critical)
- Otomasyon katmanı: KEDA veya custom HPA ile lag’e tepkisel ölçeklendirme
- Proaktif katman: Öngörülebilir yük artışları için zamanlanmış ölçeklendirme
- Optimizasyon katmanı: Consumer konfigürasyonu ve kod seviyesinde iyileştirmeler
En önemli ders şu: Lag’i gördükten sonra müdahale etmek reaktiftir ve çoğu zaman geç kalırsın. İyi bir monitoring ile lag’in artış trendini erken tespit etmek, sorunu iş etkisi oluşmadan çözmeni sağlar. Production’da sıfır sürpriz, en güzel hedeftir.
Bir sonraki yazıda Kafka broker tarafındaki performans sorunlarını ve partition liderlik optimizasyonunu ele alacağız.
