Kafka Log Compaction: Veri Saklama Politikalarının Yapılandırılması ve Yönetimi

Kafka cluster’ınızda zamanla biriken veriyi yönetmek, özellikle production ortamlarında ciddi bir sorun haline gelebilir. Disk dolmaya başladığında panic moduna geçmek yerine, Kafka’nın sunduğu log compaction mekanizmasını doğru yapılandırmak hem disk kullanımını kontrol altına alır hem de veri tutarlılığını garantiler. Bu yazıda log compaction’ı derinlemesine inceleyeceğiz ve gerçek dünya senaryolarıyla nasıl yapılandırılacağını ele alacağız.

Log Compaction Nedir ve Neden Önemlidir?

Kafka’da varsayılan veri saklama politikası zaman ya da boyut bazlıdır. Yani belirli bir süre geçen ya da belirli bir boyutu aşan mesajlar silinir. Ancak bazı kullanım senaryolarında, örneğin bir kullanıcının son adres bilgisini ya da bir ürünün güncel fiyatını tutmak istediğinizde, eski mesajların tamamını değil yalnızca her anahtar için en son değeri saklamak istersiniz. İşte tam burada log compaction devreye girer.

Log compaction, Kafka’nın aynı key’e sahip eski mesajları temizleyip yalnızca en güncel değeri tutmasını sağlayan bir mekanizmadır. Bunu bir anahtar-değer veritabanının güncelleme operasyonu gibi düşünebilirsiniz. Her key için en son write kazanır, eskiler compaction sürecinde temizlenir.

Bu yaklaşım özellikle şu senaryolarda kritik öneme sahiptir:

  • Event sourcing uygulamaları: Sadece son durumu yeniden oluşturmanız gerektiğinde
  • Database change data capture (CDC): Veritabanı değişikliklerini Kafka üzerinden yayarken
  • Cache invalidation: Servisler arası durum senkronizasyonu sağlarken
  • Kullanıcı profili güncellemeleri: Milyonlarca kullanıcının son profil bilgisini saklamak istediğinizde

Log Compaction’ın İç Yapısı

Kafka log dosyaları segment’lere bölünmüştür. Her partition bir dizi segment dosyasından oluşur ve bunlar ikiye ayrılır: clean (daha önce compaction geçirmiş) ve dirty (henüz compaction görmemiş) bölümler. Cleaner thread arka planda çalışarak dirty ratio belirli bir eşiği aştığında devreye girer.

Compaction süreci şu adımlarla işler:

  • Cleaner thread dirty segment’leri tarar ve her key için en son offset’i bir map’e yazar
  • Bu map kullanılarak eski değerlere sahip mesajlar atlanır, yeni değerler yeni bir segment’e yazılır
  • Eski segment’ler silinir ve yeni compacted segment aktif hale gelir
  • Aktif (head) segment’e compaction uygulanmaz, o her zaman tüm mesajları tutar

Önemli bir detay: tombstone kavramı. Eğer bir key’i tamamen silmek istiyorsanız, o key için null değerli bir mesaj (tombstone) gönderirsiniz. Compaction bu mesajı bir süre tutar, daha sonra tamamen temizler.

Topic Seviyesinde Log Compaction Yapılandırması

Yeni bir topic oluştururken compaction’ı etkinleştirmek oldukça basittir:

kafka-topics.sh --bootstrap-server localhost:9092 
  --create 
  --topic user-profiles 
  --partitions 6 
  --replication-factor 3 
  --config cleanup.policy=compact

Mevcut bir topic’i compaction moduna geçirmek için:

kafka-configs.sh --bootstrap-server localhost:9092 
  --entity-type topics 
  --entity-name user-profiles 
  --alter 
  --add-config cleanup.policy=compact

Bazı durumlarda hem zaman bazlı silme hem de compaction’ı birlikte kullanmak isteyebilirsiniz. Örneğin son 7 günden eski compacted mesajları da silmek için:

kafka-configs.sh --bootstrap-server localhost:9092 
  --entity-type topics 
  --entity-name user-profiles 
  --alter 
  --add-config 'cleanup.policy=compact,delete,retention.ms=604800000'

Bu kombinasyon özellikle tombstone mesajlarının da sonunda temizlenmesini garantilemek için kullanışlıdır.

Broker Seviyesinde Compaction Ayarları

Broker yapılandırması, compaction’ın ne kadar agresif çalışacağını belirler. server.properties dosyasında aşağıdaki parametreleri dikkatli ayarlamak gerekir:

# /etc/kafka/server.properties

# Cleaner thread sayısı - yüksek I/O kapasiteli sunucularda artırabilirsiniz
log.cleaner.threads=4

# Cleaner'ın I/O bant genişliği limiti (bytes/saniye) - 0 sınırsız demektir
log.cleaner.io.max.bytes.per.second=0

# Dirty ratio eşiği - bir partition'ın dirty kısmı toplamın %50'sini geçince compaction başlar
log.cleaner.min.cleanable.ratio=0.5

# Compaction sonrası bir mesajın minimum yaşı (ms) - 12 saat
log.cleaner.min.compaction.lag.ms=43200000

# Compaction öncesi bir mesajın maksimum yaşı (ms) - 7 gün
log.cleaner.max.compaction.lag.ms=604800000

# Cleaner buffer boyutu - büyük topic'ler için artırın
log.cleaner.dedupe.buffer.size=134217728

log.cleaner.min.cleanable.ratio: Bu parametre en kritik olanıdır. 0.5 değeri, dirty mesajların toplam log boyutunun yüzde 50’sini aşması durumunda cleaner’ın devreye gireceği anlamına gelir. Bu değeri düşürürseniz daha sık ama daha küçük compaction işlemleri gerçekleşir, artırırsanız daha seyrek ama daha büyük compaction döngüleri yaşanır.

log.cleaner.min.compaction.lag.ms: Bir mesajın compaction’a tabi tutulmadan önce ne kadar süre beklemesi gerektiğini belirler. Bu değer, consumer’larınızın mesajları işlemek için yeterli zamanı olmasını garantiler. Production ortamında bu değeri en az birkaç saat olarak ayarlamak iyi bir pratiktir.

Gerçek Dünya Senaryosu: E-Ticaret Ürün Kataloğu

Diyelim ki büyük bir e-ticaret platformunda çalışıyorsunuz ve ürün fiyat değişikliklerini Kafka üzerinden yayınlıyorsunuz. Günde milyonlarca fiyat güncellemesi geliyor ve son 30 gün içindeki tüm geçmişi saklamanıza gerek yok, sadece her ürünün güncel fiyatını tutmak yeterli.

Önce topic’i oluşturun:

kafka-topics.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 
  --create 
  --topic product-prices 
  --partitions 12 
  --replication-factor 3 
  --config cleanup.policy=compact 
  --config min.compaction.lag.ms=3600000 
  --config segment.bytes=104857600 
  --config segment.ms=3600000

Bu yapılandırmada segment boyutunu 100MB ile sınırlandırdık ve her saatte bir yeni segment oluşturulmasını sağladık. Bu sayede compaction daha küçük ve yönetilebilir parçalar üzerinde çalışır.

Python ile basit bir producer yazalım:

from confluent_kafka import Producer
import json

conf = {
    'bootstrap.servers': 'kafka1:9092,kafka2:9092,kafka3:9092',
    'acks': 'all',
    'enable.idempotence': True
}

producer = Producer(conf)

def update_product_price(product_id, price, currency='TRY'):
    key = str(product_id).encode('utf-8')
    value = json.dumps({
        'product_id': product_id,
        'price': price,
        'currency': currency,
        'updated_at': '2024-01-15T10:30:00Z'
    }).encode('utf-8')
    
    producer.produce(
        topic='product-prices',
        key=key,
        value=value,
        callback=lambda err, msg: print(f"Delivered: {msg.offset()}" if not err else f"Error: {err}")
    )

# Bir ürünü silmek için tombstone gönder
def delete_product(product_id):
    producer.produce(
        topic='product-prices',
        key=str(product_id).encode('utf-8'),
        value=None  # Tombstone mesajı
    )

producer.flush()

Compaction Durumunu İzleme ve Sorun Giderme

Compaction’ın düzgün çalışıp çalışmadığını izlemek için önce mevcut durumu kontrol edin:

# Compaction istatistiklerini görüntüle
kafka-log-dirs.sh --bootstrap-server localhost:9092 
  --topic-list user-profiles 
  --describe | python3 -m json.tool

# Cleaner thread metriklerini JMX üzerinden sorgula
kafka-run-class.sh kafka.tools.JmxTool 
  --jmx-url service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi 
  --object-name "kafka.log:type=LogCleaner,name=cleaner-recopy-percent" 
  --date-format "YYYY-MM-dd HH:mm:ss" 
  --reporting-interval 5000

Compaction sorunlarını teşhis etmek için Kafka log dosyalarını da inceleyebilirsiniz:

# Log segment içeriğini okuyun
kafka-dump-log.sh --files /var/kafka-logs/user-profiles-0/00000000000000000000.log 
  --print-data-log 
  --deep-iteration | head -50

# Index dosyasını kontrol edin
kafka-dump-log.sh --files /var/kafka-logs/user-profiles-0/00000000000000000000.index 
  --index-sanity-check

Önemli bir uyarı: Eğer log.cleaner.enable=false ayarını görmezden gelirseniz compaction hiç çalışmaz. Bunu mutlaka kontrol edin.

Performans Optimizasyonu ve Dikkat Edilmesi Gerekenler

Yüksek yük altında çalışan cluster’larda compaction thread’lerinin sistemi olumsuz etkilememesi için bazı optimizasyonlar yapmalısınız:

# Topic bazlı compaction agresifliğini düşürün - büyük topic'ler için
kafka-configs.sh --bootstrap-server localhost:9092 
  --entity-type topics 
  --entity-name user-profiles 
  --alter 
  --add-config 'min.cleanable.dirty.ratio=0.7,min.compaction.lag.ms=86400000'

Büyük compaction işlemleri sırasında broker’ın diğer işleri aksatmaması için I/O throttling uygulayın:

# Cleaner I/O limitini ayarla - 50MB/s
kafka-configs.sh --bootstrap-server localhost:9092 
  --entity-type brokers 
  --entity-name 1 
  --alter 
  --add-config 'log.cleaner.io.max.bytes.per.second=52428800'

Compaction sırasında dikkat edilmesi gereken kritik noktalar:

  • Offset sürekliliği korunur: Compaction sonrasında mesaj offset’leri değişmez, sadece bazı offset’lere karşılık gelen mesajlar boş olur. Consumer’larınız bunu gracefully handle etmelidir.
  • Head segment compaction’dan muaftır: Yani aktif yazma yapılan segment temizlenmez. Bu nedenle compaction’ın anlık bir etki yaratmasını beklemeyin.
  • Sıralama garantisi: Compaction aynı key’in mesajlarının sırasını korur, ancak farklı key’ler arasındaki orijinal sırayı bozabilir.
  • Consumer group offset’leri: Eğer bir consumer group henüz işlemediği bir mesaj compaction ile temizlenirse, consumer bir sonraki mevcut offset’ten devam eder. Bu veri kaybına yol açabilir, bu yüzden min.compaction.lag.ms ayarını dikkatli yapın.

Monitoring ve Alerting Kurulumu

Production ortamında compaction metriklerini Prometheus ile toplamak için JMX exporter kullanabilirsiniz:

# jmx_prometheus_config.yaml - Kafka Compaction metrikleri
rules:
  - pattern: 'kafka.log<type=LogCleaner, name=cleaner-recopy-percent><>Value'
    name: kafka_log_cleaner_recopy_percent
    help: "Log cleaner'ın yeniden kopyaladığı veri yüzdesi"
    
  - pattern: 'kafka.log<type=LogCleanerManager, name=max-dirty-percent><>Value'
    name: kafka_log_cleaner_max_dirty_percent
    help: "En kirli partition'ın dirty yüzdesi"
    
  - pattern: 'kafka.log<type=Log, topic=(.+), partition=(.+), name=LogEndOffset><>Value'
    name: kafka_log_end_offset
    labels:
      topic: "$1"
      partition: "$2"

Grafana’da izleyeceğiniz kritik metrikler şunlar olmalıdır:

  • kafka_log_cleaner_max_dirty_percent: Bu değer uzun süre yüksek kalıyorsa compaction ayaklarınız yanlış yapılandırılmıştır.
  • kafka_log_cleaner_recopy_percent: Çok yüksekse gereksiz veri kopyalanıyor demektir, buffer boyutunu artırın.
  • kafka_log_size: Compaction aktifken bile log boyutu sürekli artıyorsa yapılandırmayı gözden geçirin.

Alert kuralı örneği:

# Prometheus alert kuralı - alert_rules.yaml olarak kaydedin
groups:
  - name: kafka_compaction
    rules:
      - alert: KafkaCompactionLagging
        expr: kafka_log_cleaner_max_dirty_percent > 80
        for: 30m
        labels:
          severity: warning
        annotations:
          summary: "Kafka compaction geriden geliyor"
          description: "Max dirty percent {{ $value }}% - compaction thread sayısını artırın"
          
      - alert: KafkaLogSizeIncreasing
        expr: rate(kafka_log_size[1h]) > 0 and on(topic) kafka_topic_cleanup_policy == "compact"
        for: 2h
        labels:
          severity: critical
        annotations:
          summary: "Compact topic boyutu artıyor"

Tombstone Yönetimi ve delete.retention.ms

Tombstone mesajları compaction’dan sonra hemen silinmez. delete.retention.ms parametresi, tombstone’ların ne kadar süre tutulacağını belirler. Bu süre boyunca consumer’lar tombstone’u görür ve buna göre aksiyon alabilir:

# Tombstone retention süresini 24 saate ayarla (varsayılan 86400000 ms)
kafka-configs.sh --bootstrap-server localhost:9092 
  --entity-type topics 
  --entity-name user-profiles 
  --alter 
  --add-config 'delete.retention.ms=86400000'

Bu değeri çok düşük ayarlarsanız, bazı consumer’lar tombstone’u kaçırabilir ve silinen kaydı hala aktif sanabilir. Özellikle CDC senaryolarında bu kritik bir hataya yol açar.

Compaction ve Retention Politikalarının Birlikte Kullanımı

Gerçek dünya uygulamalarında saf compaction yerine hibrit yaklaşım daha yaygındır. Örneğin bir fintech uygulamasında hem son durumu tutmak hem de belirli bir süre geriye gidebilmek isteyebilirsiniz:

# Hibrit policy: Son 30 günü tut ve compaction uygula
kafka-configs.sh --bootstrap-server localhost:9092 
  --entity-type topics 
  --entity-name account-balances 
  --alter 
  --add-config 'cleanup.policy=compact,delete,
                retention.ms=2592000000,
                min.compaction.lag.ms=3600000,
                max.compaction.lag.ms=86400000'

Bu yapılandırmada:

  • cleanup.policy=compact,delete: Hem compaction hem de zaman bazlı silme aktif
  • retention.ms=2592000000: 30 günden eski mesajlar silinir
  • min.compaction.lag.ms=3600000: Mesajlar en az 1 saat bekledikten sonra compaction’a girer
  • max.compaction.lag.ms=86400000: Mesajlar en fazla 24 saat compaction bekleyebilir

Sonuç

Log compaction, Kafka’yı salt bir mesaj kuyruğunun ötesine taşıyan güçlü bir mekanizmadır. Doğru yapılandırıldığında hem disk kullanımını dramatik biçimde azaltır hem de uygulamalarınızın her zaman güncel duruma erişmesini sağlar. Ancak bu gücü kullanmak bazı sorumluluklar getirir.

Production ortamına geçmeden önce şu kontrol listesini uygulayın: log.cleaner.min.compaction.lag.ms değerini tüm consumer’larınızın mesajları işleyebileceği bir süreye göre ayarlayın. Tombstone yönetimini göz ardı etmeyin, silinmesi gereken kayıtların düzgün temizlendiğinden emin olun. Compaction metriklerini mutlaka izleyin, sessiz bir şekilde geriden kalan compaction thread’leri fark etmeden disk dolabilir. Son olarak hibrit retention politikalarını değerlendirin; saf compaction her senaryoya uygun değildir.

Kafka ekosistemi hızla gelişiyor ve log compaction davranışı da zaman zaman minor değişiklikler geçiriyor. Kullandığınız Kafka versiyonunun release notes’larını takip etmek, beklenmedik sürprizlerden kaçınmanızı sağlar.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir