Kafka ile Exactly Once Semantics Yapılandırması

Dağıtık sistemlerde mesaj teslim garantisi üzerine konuşmak her zaman biraz felsefi bir hal alır. “En az bir kez mi, en fazla bir kez mi, tam olarak bir kez mi?” sorusu kulağa basit gelir ama Kafka ekosisteminde bu sorunun cevabı, üretim ortamınızda ciddi para kaybına ya da veri tutarsızlığına yol açabilir. Ben bu yazıda sizi teorik Kafka dokümantasyonuna yönlendirmeyeceğim. Kendi elimden geçen, ödeme sistemleri ve stok yönetimi gibi kritik iş akışlarında yaşadığım deneyimlerden hareketle Exactly Once Semantics’i (EOS) nasıl yapılandırırsınız, nerede tökezlersiniz ve nasıl düzelirsiniz, bunları konuşacağız.

Neden Exactly Once Bu Kadar Zor?

Kafka’nın varsayılan davranışına baktığınızda “at least once” yani en az bir kez teslim garantisi görürsünüz. Producer bir mesaj gönderir, broker tarafından onay (ack) gelmezse yeniden dener. Bu yeniden deneme sırasında mesaj aslında broker’a ulaşmış olabilir, sadece ack kaybolmuştur. Sonuç: duplicate message, yani yinelenen mesaj.

“At most once” ise tam tersi. Ack beklemeden devam edersiniz, mesaj kaybolabilir ama en azından duplicate olmaz. Finansal işlemler için ikisi de kabul edilemez.

Kafka 0.11 ile gelen idempotent producer ve transaction API’si bu sorunu çözüyor. Ama “Kafka’da EOS açtım, sorun bitti” demek acelecilik olur. EOS’un gerçekten işe yaraması için producer, broker ve consumer tarafında bir arada doğru konfigürasyon gerekiyor.

Temel Kavramları Netleştirelim

EOS’u anlamak için üç kavramı netleştirmek lazım:

  • Idempotent Producer: Aynı mesajı birden fazla kez gönderse bile broker tarafında sadece bir kez yazılmasını garanti eder. Producer ID (PID) ve sequence number mekanizmasıyla çalışır.
  • Transactional Producer: Birden fazla partition’a veya topic’e yazılan mesajları atomik olarak commit veya abort eder.
  • Read Committed Isolation: Consumer tarafında sadece commit edilmiş mesajları okur. Bu olmadan transaction’dan dönen mesajlar okunabilir hale gelir.

Bu üçü birlikte çalışmadığında EOS eksik kalır.

Producer Tarafında Konfigürasyon

Idempotent producer’ı aktif etmek için yapmanız gereken ilk şey enable.idempotence=true parametresini set etmek. Bu tek satır aslında arka planda birkaç şeyi otomatik olarak ayarlar.

# server.properties veya uygulama konfigürasyonu için örnek producer config
cat <<EOF > producer.properties
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
enable.idempotence=true
acks=all
retries=2147483647
max.in.flight.requests.per.connection=5
EOF

enable.idempotence=true dediğinizde Kafka otomatik olarak acks=all ve retries=Integer.MAX_VALUE değerlerini set eder. Siz bunları ayrıca yazmak zorunda değilsiniz ama açıkça yazmak production konfigürasyonunda okunabilirlik açısından iyi alışkanlık.

max.in.flight.requests.per.connection parametresi dikkat gerektiriyor. Idempotent mod için bu değer en fazla 5 olabilir. Eğer 5’ten büyük bir değer verirseniz Kafka hata fırlatır.

Transactional producer için ek olarak transactional.id tanımlamanız gerekiyor:

cat <<EOF > transactional-producer.properties
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
enable.idempotence=true
acks=all
retries=2147483647
max.in.flight.requests.per.connection=5
transactional.id=payment-service-producer-1
transaction.timeout.ms=60000
EOF

transactional.id değeri uygulamanızda benzersiz ve kararlı (stable) olmalı. Uygulama yeniden başladığında aynı transactional.id ile devam etmeli ki önceki tamamlanmamış transaction’lar temizlenebilsin. Dinamik üretilen bir UUID kullanmak bu garantiyi bozar.

Java ile Transaction Kullanımı

Eğer Java tabanlı bir servis yazıyorsanız, temel transaction akışı şöyle görünür:

// Kafka transaction örneği - Java
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("enable.idempotence", "true");
props.put("transactional.id", "order-processor-1");
props.put("acks", "all");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

try {
    producer.beginTransaction();
    
    // Birden fazla topic'e atomik yazma
    producer.send(new ProducerRecord<>("order-events", orderId, orderJson));
    producer.send(new ProducerRecord<>("inventory-updates", productId, inventoryJson));
    producer.send(new ProducerRecord<>("audit-log", orderId, auditJson));
    
    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    // Bu hatalar kurtarılamaz, producer'ı kapat
    producer.close();
} catch (KafkaException e) {
    // Transaction'ı iptal et ve tekrar dene
    producer.abortTransaction();
}

Burada ProducerFencedException özellikle dikkat gerektiriyor. Aynı transactional.id ile iki farklı producer instance’ı ayağa kalkarsa, eski olan “fence” lanır yani etkisiz hale getirilir. Bu Kafka’nın zombie producer senaryosuna karşı korumasıdır.

Broker Tarafında Gerekli Konfigürasyon

Producer tarafını ayarladınız diyelim, broker tarafında da bazı parametreler önemli:

# broker konfigürasyonu - server.properties
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
transaction.max.timeout.ms=900000
log.retention.check.interval.ms=300000
  • transaction.state.log.replication.factor: __transaction_state topic’inin replikasyon faktörü. 3 node’lu cluster’da 3 olmalı.
  • transaction.state.log.min.isr: Minimum in-sync replica sayısı. 2 ile güvenli bir zemin kurarsınız.
  • transaction.max.timeout.ms: Bir transaction’ın alabileceği maksimum süre. 15 dakika genellikle yeterli ama uzun süren batch işlerde artırmak gerekebilir.

Bir konuya dikkat çekmek istiyorum: Eğer transaction.state.log.replication.factor değerini cluster büyüklüğünüzden yüksek set ederseniz __transaction_state topic’i oluşturulamaz ve tüm transactional producer’larınız başlangıçta hata alır. Yeni cluster kuruyorsanız bu değeri önce kontrol edin.

Consumer Tarafında Isolation Level

Transaction’ın tamamlanmamış mesajlarını consumer’ın okumaması için isolation.level parametresini read_committed olarak set etmek zorunlu:

cat <<EOF > consumer.properties
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
group.id=payment-consumer-group
isolation.level=read_committed
enable.auto.commit=false
auto.offset.reset=earliest
EOF

enable.auto.commit=false burada kritik. Eğer auto commit açık olursa, consumer mesajı işlemeden önce offset commit edebilir. Uygulama o noktada çökerse mesaj kaybolur. Manuel commit ile mesajı işledikten sonra commit edersiniz.

Python’da confluent-kafka ile şöyle görünür:

from confluent_kafka import Consumer, KafkaError

conf = {
    'bootstrap.servers': 'kafka1:9092,kafka2:9092',
    'group.id': 'payment-consumer-group',
    'isolation.level': 'read_committed',
    'enable.auto.commit': False,
    'auto.offset.reset': 'earliest'
}

consumer = Consumer(conf)
consumer.subscribe(['payment-events'])

while True:
    msg = consumer.poll(timeout=1.0)
    
    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            continue
        else:
            print(f"Hata: {msg.error()}")
            break
    
    try:
        # Mesajı işle
        process_payment(msg.value().decode('utf-8'))
        # Başarılıysa offset commit et
        consumer.commit(asynchronous=False)
    except Exception as e:
        print(f"İşleme hatası: {e}")
        # Offset commit etme, bir sonraki poll'da aynı mesajı tekrar al

Kafka Streams ile EOS

Kafka Streams kullanıyorsanız EOS konfigürasyonu daha da basit hale geliyor. Streams API transaction yönetimini sizin için hallediyor:

Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "inventory-processor");
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092");
streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
                  StreamsConfig.EXACTLY_ONCE_V2);
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
streamsConfig.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);

KafkaStreams streams = new KafkaStreams(topology, streamsConfig);
streams.start();

EXACTLY_ONCE_V2 Kafka 2.6 ile geldi ve EXACTLY_ONCE‘dan daha verimli çalışıyor. Eski sürümlerde exactly_once kullanmak zorundayabilirsiniz. İkisi arasındaki temel fark: V2’de her stream thread’i ayrı transactional producer kullanır ve broker üzerindeki yük daha dengeli dağılır.

COMMIT_INTERVAL_MS_CONFIG değeri 100ms gibi düşük tutulursa throughput düşer ama latency azalır. Bunu iş gereksinimlerinize göre ayarlayın. Batch işler için 1000-5000ms arası makul bir değer.

Üretim Ortamında Karşılaşılan Gerçek Sorunlar

Transactional ID Çakışması

Kubernetes ortamında pod scaling yaptığınızda her pod’un benzersiz ve kararlı bir transactional.id‘ye ihtiyacı var. Bunu pod adıyla şöyle çözebilirsiniz:

# Kubernetes deployment ortam değişkeni
env:
- name: POD_NAME
  valueFrom:
    fieldRef:
      fieldPath: metadata.name
- name: KAFKA_TRANSACTIONAL_ID
  value: "payment-service-$(POD_NAME)"

Uygulama kodunda bu değeri okuyun:

import os

transactional_id = os.environ.get('KAFKA_TRANSACTIONAL_ID', 'payment-service-default')

producer_config = {
    'bootstrap.servers': 'kafka1:9092',
    'enable.idempotence': True,
    'transactional.id': transactional_id,
    'acks': 'all'
}

Pod silinip yeniden oluşturulduğunda aynı ismi alırsa aynı transactional.id‘yi kullanır ve önceki tamamlanmamış transaction temizlenir.

Transaction Timeout Sorunları

Uzun süren işlemlerde transaction timeout gerçek baş ağrısıdır. Bir batch işlem başlattınız, transaction.timeout.ms değerinizi geçti ve broker transaction’ı abort etti. Siz hala commit etmeye çalışıyorsunuz, TransactionAbortedException alıyorsunuz.

Çözüm için büyük batch’leri daha küçük parçalara bölmek iyi bir strateji:

// Büyük batch'i chunk'lara bölerek işle
List<List<Order>> chunks = partition(orders, 500); // 500'erlik gruplar

for (List<Order> chunk : chunks) {
    producer.beginTransaction();
    try {
        for (Order order : chunk) {
            producer.send(new ProducerRecord<>("orders", order.getId(), order.toJson()));
        }
        producer.commitTransaction();
    } catch (KafkaException e) {
        producer.abortTransaction();
        throw e;
    }
}

Monitoring ve Alerting

EOS yapılandırmasının çalıştığını doğrulamak için JMX metriklerini izlemeniz gerekiyor:

# Kafka producer transaction metrikleri - JMX üzerinden
# kafka.producer:type=producer-metrics,client-id=*

# Kritik metrikler:
# txn-abort-rate -> Yüksekse transaction problemleri var
# txn-commit-rate -> Normal commit oranı
# record-error-rate -> Sıfıra yakın olmalı
# producer-fencing-rate -> Sıfır olmalı, yüksekse zombie producer var

# Prometheus JMX exporter ile metrik çekme
cat <<EOF > kafka-producer-jmx.yml
rules:
  - pattern: 'kafka.producer<type=producer-metrics, client-id=(.+)><>(txn-abort-rate|txn-commit-rate|record-error-rate)'
    name: kafka_producer_$2
    labels:
      client_id: "$1"
EOF

txn-abort-rate‘in ani yükselişi transaction timeout veya network problemlerini işaret eder. producer-fencing-rate sıfırın üzerine çıkıyorsa aynı transactional.id‘yle yarışan birden fazla producer var demektir, deployment süreçlerinizi gözden geçirin.

EOS’un Performans Etkisi

Dürüst olmak gerekirse EOS bedava gelmiyor. Throughput üzerinde etkisi var. Kendi test ortamımda gördüğüm rakamlar şöyle:

  • At most once: Referans baseline, maksimum throughput
  • At least once (acks=all): Yaklaşık %10-15 düşüş
  • Exactly once (idempotent): At least once’dan yaklaşık %5-10 daha düşük
  • Exactly once (transactional): En fazla etki, baseline’dan %20-30 düşük

Bu rakamlar mesaj boyutuna, partition sayısına ve ağ gecikmesine göre değişir. Kritik finansal işlemler için bu tradeoff kabul edilebilir. Analitik log akışları için ise at least once’la gidip consumer tarafında idempotent işleme yapmak daha mantıklı olabilir.

Performansı optimize etmek için şu parametreleri göz önünde bulundurun:

# Producer performans optimizasyonu (EOS ile beraber)
linger.ms=5
batch.size=65536
compression.type=lz4
buffer.memory=67108864

linger.ms değerini biraz artırmak mesajları batch’lemesine yardım eder ve transaction overhead’ini dağıtır.

Test Ortamında EOS Doğrulama

Gerçekten çalışıp çalışmadığını test etmek önemli. Basit bir test senaryosu:

# EOS test senaryosu - duplicate kontrolü
from confluent_kafka import Producer, Consumer
import uuid
import json

def test_eos_no_duplicates():
    test_id = str(uuid.uuid4())
    message_count = 100
    
    # Producer - transactional
    producer_conf = {
        'bootstrap.servers': 'localhost:9092',
        'enable.idempotence': True,
        'transactional.id': f'test-producer-{test_id}',
        'acks': 'all'
    }
    
    producer = Producer(producer_conf)
    producer.init_transactions()
    
    producer.begin_transaction()
    for i in range(message_count):
        producer.produce('test-eos-topic', 
                        key=str(i), 
                        value=json.dumps({'index': i, 'test_id': test_id}))
    producer.commit_transaction()
    producer.flush()
    
    # Consumer - read_committed
    consumer_conf = {
        'bootstrap.servers': 'localhost:9092',
        'group.id': f'test-consumer-{test_id}',
        'isolation.level': 'read_committed',
        'enable.auto.commit': False,
        'auto.offset.reset': 'earliest'
    }
    
    consumer = Consumer(consumer_conf)
    consumer.subscribe(['test-eos-topic'])
    
    received = {}
    timeout_count = 0
    
    while timeout_count < 10:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            timeout_count += 1
            continue
        
        data = json.loads(msg.value())
        if data['test_id'] == test_id:
            index = data['index']
            if index in received:
                print(f"HATA: Duplicate mesaj bulundu - index: {index}")
                return False
            received[index] = True
    
    consumer.close()
    producer.close()
    
    assert len(received) == message_count, f"Beklenen {message_count}, alınan {len(received)}"
    print(f"BAŞARILI: {message_count} mesaj duplicate olmadan teslim edildi")
    return True

test_eos_no_duplicates()

Bu test prodüksiyona çıkmadan önce CI/CD pipeline’ınıza entegre edebileceğiniz basit bir doğrulama noktası. Gerçek senaryolarda chaos engineering araçlarıyla broker’ı kasıtlı olarak düşürüp recovery sonrası duplicate olup olmadığını test etmek çok daha değerli.

Sonuç

Kafka’da Exactly Once Semantics tek bir parametre değişikliğiyle elde edilen bir özellik değil. Producer, broker ve consumer’ın koordineli şekilde yapılandırılmasını gerektiren bir sistem tasarımı meselesi. Özellikle transactional.id yönetimi, isolation level ve manual commit kombinasyonu üçünü doğru oturttuğunuzda gerçek anlamda EOS’tan söz edebilirsiniz.

Önerdiğim yaklaşım şu: Her şeyi EOS ile çalıştırmaya kalkmayın. Hangi veri akışlarının gerçekten buna ihtiyacı olduğunu belirleyin. Ödeme işlemleri, stok güncellemeleri, kritik durum değişiklikleri bunlar için EOS zorunlu. Kullanıcı davranış logları, metrik akışları gibi veriler için at least once ile idempotent consumer kombinasyonu genellikle yeterli ve çok daha performanslı.

Üretim ortamında EOS çalıştırıyorsanız txn-abort-rate ve producer-fencing-rate metriklerini her zaman izleyin. Bu iki metriğin sıfırın üzerine çıkması size sorun olduğunu hemen söyler. Kafka transaction yönetimini anlayıp doğru yapılandırdığınızda, dağıtık sistemlerde veri bütünlüğü garantisi vermek gerçekten mümkün hale geliyor.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir