Python ile Kafka Producer ve Consumer Yazımı

Kafka cluster’ı ayağa kaldırdın, topic’leri oluşturdun, her şey güzel görünüyor. Peki ya uygulamandan Kafka’ya mesaj gönderip almak? İşte burada Python devreye giriyor ve çoğu zaman insanlar basit “hello world” örneklerinin ötesine geçemiyor. Bu yazıda gerçek dünyada karşılaşacağın senaryolar üzerinden, production-ready producer ve consumer yazmanın inceliklerini aktaracağım.

Neden confluent-kafka-python?

Python dünyasında Kafka için iki ana kütüphane var: kafka-python ve confluent-kafka-python. Pek çok kaynakta kafka-python öneriliyor çünkü pure Python ve kurulumu kolay. Ancak production ortamında bunu kullandığında performans sorunlarıyla karşılaşıyorsun.

confluent-kafka-python arka planda librdkafka kullanıyor. Bu C kütüphanesi, throughput ve latency açısından ciddi fark yaratıyor. Özellikle saniyede binlerce mesaj gönderdiğinde bu farkı net görüyorsun. Ben her iki kütüphaneyi de production’da kullandım, sonunda tercihim confluent-kafka oldu.

Kurulum için:

pip install confluent-kafka

Eğer Avro schema desteğine ihtiyaç duyuyorsan Schema Registry için:

pip install confluent-kafka[avro]

Temel Producer Yazımı

Önce en basit haliyle producer’ı görelim, sonra üzerine ekleyeceğiz:

from confluent_kafka import Producer
import json

conf = {
    'bootstrap.servers': 'localhost:9092',
    'client.id': 'python-producer-v1'
}

producer = Producer(conf)

def delivery_callback(err, msg):
    if err:
        print(f'Mesaj gonderilemedi: {err}')
    else:
        print(f'Mesaj gonderildi | Topic: {msg.topic()} | '
              f'Partition: {msg.partition()} | Offset: {msg.offset()}')

veri = {
    'kullanici_id': 1234,
    'eylem': 'satin_alma',
    'urun_id': 'SKU-9876',
    'timestamp': '2024-01-15T10:30:00Z'
}

producer.produce(
    topic='kullanici-eylemleri',
    key=str(veri['kullanici_id']),
    value=json.dumps(veri),
    callback=delivery_callback
)

producer.flush()

Burada dikkat edilmesi gereken birkaç nokta var. producer.flush() çağrısını unutursan buffer’daki mesajlar gönderilmeden program sonlanır. delivery_callback olmadan neyin gittiğini, neyin gitmediğini bilemezsin.

Production-Ready Producer

Gerçek hayatta bu kadar basit olmuyor. Hata yönetimi, retry mekanizması, batch gönderim gibi şeyler lazım:

from confluent_kafka import Producer, KafkaException
import json
import logging
import time
from typing import Optional, Callable

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class KafkaProducer:
    def __init__(self, bootstrap_servers: str, **ek_konfig):
        self.conf = {
            'bootstrap.servers': bootstrap_servers,
            'client.id': 'uygulama-producer',
            # Acknowledgment: 0=fire-forget, 1=leader, all=tum replikalar
            'acks': 'all',
            # Maksimum retry sayisi
            'retries': 5,
            'retry.backoff.ms': 300,
            # Batch boyutu (byte)
            'batch.size': 16384,
            # Mesajlari batch'lemek icin bekleme suresi (ms)
            'linger.ms': 10,
            # Buffer toplam boyutu
            'buffer.memory': 33554432,
            # Idempotent producer - duplicate mesaj engellemek icin
            'enable.idempotence': True,
        }
        self.conf.update(ek_konfig)
        self.producer = Producer(self.conf)
        self._pending_count = 0

    def _delivery_callback(self, err, msg):
        self._pending_count -= 1
        if err:
            logger.error(
                f'Delivery hatasi | Topic: {msg.topic()} | '
                f'Hata: {err.str()} | Kod: {err.code()}'
            )
        else:
            logger.debug(
                f'OK | Topic: {msg.topic()} | '
                f'Partition: {msg.partition()} | Offset: {msg.offset()}'
            )

    def gonder(self, topic: str, value: dict,
                key: Optional[str] = None,
                headers: Optional[dict] = None) -> bool:
        max_deneme = 3
        for deneme in range(max_deneme):
            try:
                # Poll ile onceki mesajlarin callback'lerini isle
                self.producer.poll(0)

                self.producer.produce(
                    topic=topic,
                    key=key.encode('utf-8') if key else None,
                    value=json.dumps(value, ensure_ascii=False).encode('utf-8'),
                    headers=headers,
                    callback=self._delivery_callback
                )
                self._pending_count += 1
                return True

            except BufferError:
                logger.warning(f'Buffer dolu, bekleniyor... (deneme {deneme+1})')
                self.producer.poll(1)
                time.sleep(0.1)

            except KafkaException as e:
                logger.error(f'Kafka hatasi: {e}')
                if deneme == max_deneme - 1:
                    raise
                time.sleep(0.5 * (deneme + 1))

        return False

    def flush(self, timeout: float = 30.0):
        kalan = self.producer.flush(timeout)
        if kalan > 0:
            logger.warning(f'{kalan} mesaj gonderilemedi, timeout asildi')
        return kalan

    def __enter__(self):
        return self

    def __exit__(self, *args):
        self.flush()


# Kullanim
if __name__ == '__main__':
    with KafkaProducer('localhost:9092') as producer:
        for i in range(100):
            producer.gonder(
                topic='siparis-olaylari',
                key=f'musteri-{i % 10}',
                value={'siparis_no': i, 'durum': 'olusturuldu'},
                headers={'kaynak': 'web-api', 'versiyon': 'v2'}
            )

enable.idempotence ayarı özellikle önemli. Producer retry yaptığında aynı mesajı iki kez yazmasını engelliyor. acks=all ile birlikte kullanmak zorundasın, aksi halde exception alırsın.

Temel Consumer Yazımı

Consumer tarafı producer’a göre biraz daha karmaşık çünkü offset yönetimi, partition assignment, group coordination gibi konuları düşünmek lazım:

from confluent_kafka import Consumer, KafkaError, KafkaException
import json
import signal
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'siparis-isleme-grubu',
    # earliest: en bastan oku, latest: sadece yeni mesajlari oku
    'auto.offset.reset': 'earliest',
    # Otomatik commit - dikkatli kullan!
    'enable.auto.commit': True,
    'auto.commit.interval.ms': 5000,
}

consumer = Consumer(conf)
consumer.subscribe(['siparis-olaylari'])

calis = True

def kapat(signum, frame):
    global calis
    logger.info('Kapatma sinyali alindi...')
    calis = False

signal.signal(signal.SIGINT, kapat)
signal.signal(signal.SIGTERM, kapat)

try:
    while calis:
        msg = consumer.poll(timeout=1.0)

        if msg is None:
            continue

        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                logger.info(f'Partition sonu: {msg.partition()} @ {msg.offset()}')
            else:
                raise KafkaException(msg.error())
        else:
            veri = json.loads(msg.value().decode('utf-8'))
            logger.info(f'Mesaj alindi: {veri}')

finally:
    consumer.close()
    logger.info('Consumer kapatildi')

Manuel Offset Yönetimi

enable.auto.commit=True kullanışlı görünse de tehlikeli. Mesajı aldın, işlemeye başladın, tam bu sırada uygulaman çöktü. Kafka o mesajı commit’lenmiş sayıyor ve bir daha göndermeyecek. Veri kaybı.

Manuel commit ile bunu çözüyorsun:

from confluent_kafka import Consumer, KafkaError, TopicPartition
import json
import logging

logger = logging.getLogger(__name__)

conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'guclu-consumer-grubu',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,  # Manuel commit
    'max.poll.interval.ms': 300000,  # 5 dakika - uzun islemler icin
    'session.timeout.ms': 30000,
}

consumer = Consumer(conf)
consumer.subscribe(['siparis-olaylari'])

def mesaji_isle(veri: dict) -> bool:
    """Gercek is mantigi burada. True: basarili, False: basarisiz"""
    try:
        # Veritabani kaydi, API cagrisi vs.
        logger.info(f"Siparis isleniyor: {veri.get('siparis_no')}")
        # Simule ediyoruz
        return True
    except Exception as e:
        logger.error(f'Isleme hatasi: {e}')
        return False

try:
    while True:
        mesajlar = consumer.consume(num_messages=50, timeout=1.0)

        for msg in mesajlar:
            if msg.error():
                if msg.error().code() != KafkaError._PARTITION_EOF:
                    logger.error(f'Consumer hatasi: {msg.error()}')
                continue

            try:
                veri = json.loads(msg.value().decode('utf-8'))
                basarili = mesaji_isle(veri)

                if basarili:
                    # Sadece basarili islenen mesajlari commit et
                    consumer.commit(message=msg, asynchronous=False)
                else:
                    # Hata durumunda ne yapacagini sen karar ver
                    # DLQ'ya gonder, log'la, alarm ver...
                    logger.warning(
                        f'Mesaj islenemedi, offset: {msg.offset()} '
                        f'partition: {msg.partition()}'
                    )

            except json.JSONDecodeError as e:
                logger.error(f'JSON parse hatasi: {e} | Mesaj: {msg.value()}')
                # Bozuk mesaji commit et, sonsuza kadar takili kalmasin
                consumer.commit(message=msg, asynchronous=False)

finally:
    consumer.close()

asynchronous=False commit performansı düşürür ama at-least-once garantisi ister. Eğer exactly-once istiyorsan işi Kafka Transactions’a getirmen gerekiyor ki bu ayrı bir konu.

Partition Assignment ve Rebalance Yönetimi

Consumer group’larda partition rebalance olduğunda dikkatli olmak lazım. Rebalance sırasında commit yapılmamış mesajlar kaybolabilir:

from confluent_kafka import Consumer, TopicPartition
import logging

logger = logging.getLogger(__name__)

class RebalanceYoneticisi:
    def __init__(self, consumer):
        self.consumer = consumer
        self.islenmekte = {}  # Partition -> islenmekte olan mesajlar

    def atandi(self, consumer, partitions):
        logger.info(f'Partition atandı: {[p.partition for p in partitions]}')
        for p in partitions:
            self.islenmekte[p.partition] = []

    def geri_alindi(self, consumer, partitions):
        logger.warning(f'Partition geri alindi: {[p.partition for p in partitions]}')
        # Commit yapilmamis offsetleri geri al
        for p in partitions:
            if p.partition in self.islenmekte:
                bekleyen = self.islenmekte.pop(p.partition, [])
                if bekleyen:
                    logger.warning(f'Partition {p.partition}: {len(bekleyen)} mesaj commit edilemedi')

conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'rebalance-farkindali-grup',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
    # Rebalance protokolu: cooperative daha az kesinti yaratiyor
    'partition.assignment.strategy': 'cooperative-sticky',
}

consumer = Consumer(conf)
rebalance = RebalanceYoneticisi(consumer)

consumer.subscribe(
    ['siparis-olaylari'],
    on_assign=rebalance.atandi,
    on_revoke=rebalance.geri_alindi
)

cooperative-sticky stratejisi özellikle büyük consumer group’larda rebalance süresini ciddi ölçüde kısaltıyor. Eski eager protokolde tüm partition’lar rebalance başında bırakılıp yeniden atanıyordu. cooperative-sticky‘de sadece taşınması gereken partition’lar el değiştiriyor.

Dead Letter Queue Implementasyonu

Production’da her zaman şu soruyla karşılaşırsın: işlenemeyen mesajı ne yapacaksın? Onu tekrar tekrar deneyip consumer’ı bloke etmek yerine ayrı bir DLQ topic’ine gönder:

from confluent_kafka import Consumer, Producer, KafkaError
import json
import logging
from datetime import datetime

logger = logging.getLogger(__name__)

class DLQDestekliConsumer:
    def __init__(self, bootstrap_servers: str, group_id: str,
                 kaynak_topic: str, maks_deneme: int = 3):
        self.kaynak_topic = kaynak_topic
        self.dlq_topic = f'{kaynak_topic}.dlq'
        self.maks_deneme = maks_deneme

        self.consumer = Consumer({
            'bootstrap.servers': bootstrap_servers,
            'group.id': group_id,
            'auto.offset.reset': 'earliest',
            'enable.auto.commit': False,
        })

        self.dlq_producer = Producer({
            'bootstrap.servers': bootstrap_servers,
            'acks': 'all',
        })

        self.consumer.subscribe([kaynak_topic])

    def dlq_gonder(self, msg, hata_nedeni: str, deneme_sayisi: int):
        dlq_veri = {
            'orijinal_deger': msg.value().decode('utf-8'),
            'orijinal_topic': msg.topic(),
            'orijinal_partition': msg.partition(),
            'orijinal_offset': msg.offset(),
            'hata_nedeni': hata_nedeni,
            'deneme_sayisi': deneme_sayisi,
            'dlq_zamani': datetime.utcnow().isoformat(),
        }

        self.dlq_producer.produce(
            topic=self.dlq_topic,
            key=msg.key(),
            value=json.dumps(dlq_veri, ensure_ascii=False).encode('utf-8'),
            headers={
                'kaynak-topic': self.kaynak_topic,
                'hata-tipi': type(hata_nedeni).__name__
            }
        )
        self.dlq_producer.flush(timeout=5.0)
        logger.warning(f'Mesaj DLQ'ya gonderildi: {self.dlq_topic} | Sebep: {hata_nedeni}')

    def calistir(self):
        deneme_sayaci = {}

        try:
            while True:
                msg = self.consumer.poll(timeout=1.0)
                if msg is None:
                    continue
                if msg.error():
                    if msg.error().code() != KafkaError._PARTITION_EOF:
                        logger.error(f'Consumer hatasi: {msg.error()}')
                    continue

                mesaj_kimlik = f'{msg.partition()}-{msg.offset()}'
                deneme = deneme_sayaci.get(mesaj_kimlik, 0) + 1
                deneme_sayaci[mesaj_kimlik] = deneme

                try:
                    veri = json.loads(msg.value().decode('utf-8'))
                    # Is mantigi
                    self._isle(veri)
                    self.consumer.commit(message=msg, asynchronous=False)
                    deneme_sayaci.pop(mesaj_kimlik, None)

                except Exception as e:
                    if deneme >= self.maks_deneme:
                        logger.error(f'Maksimum deneme asildi, DLQ'ya gonderiliyor')
                        self.dlq_gonder(msg, str(e), deneme)
                        self.consumer.commit(message=msg, asynchronous=False)
                        deneme_sayaci.pop(mesaj_kimlik, None)
                    else:
                        logger.warning(f'Isleme hatasi (deneme {deneme}/{self.maks_deneme}): {e}')

        finally:
            self.consumer.close()

    def _isle(self, veri: dict):
        # Uygulamaya ozel is mantigi buraya
        pass

Performans Testinde Gerçek Sayılar

Konfigürasyon ayarları teoride iyi görünüyor ama test etmeden olmaz. İşte hızlı bir throughput testi:

from confluent_kafka import Producer
import json
import time
import uuid

conf = {
    'bootstrap.servers': 'localhost:9092',
    'acks': '1',  # Sadece leader ack - daha hizli
    'batch.size': 65536,  # 64KB batch
    'linger.ms': 5,
    'compression.type': 'snappy',  # CPU vs bandwidth tradeoff
    'buffer.memory': 67108864,  # 64MB
}

producer = Producer(conf)
mesaj_sayisi = 10000
gonderilen = [0]
baslangic = time.time()

def callback(err, msg):
    if not err:
        gonderilen[0] += 1

test_verisi = {
    'id': str(uuid.uuid4()),
    'veri': 'x' * 100,  # ~100 byte payload
}

for i in range(mesaj_sayisi):
    producer.produce(
        topic='performans-test',
        value=json.dumps(test_verisi).encode('utf-8'),
        callback=callback
    )
    if i % 1000 == 0:
        producer.poll(0)

producer.flush()
sure = time.time() - baslangic

print(f'Gonderilen: {gonderilen[0]} mesaj')
print(f'Sure: {sure:.2f} saniye')
print(f'Throughput: {gonderilen[0]/sure:.0f} mesaj/saniye')

Kendi testlerimde linger.ms=0 ile linger.ms=10 arasındaki fark neredeyse 3 kat throughput farkı olarak görünüyor. Küçük ayarlar büyük farklar yaratıyor.

Yaygın Sorunlar ve Çözümler

GroupCoordinatorNotAvailable hatası: Kafka broker’a ulaşılabiliyor ama coordinator henüz hazır değil. Consumer’ı biraz bekletip tekrar bağlan. Genellikle broker yeni başlatıldığında görülür.

UNKNOWN_MEMBER_ID hatası: Session timeout dolmuş ve consumer group’tan çıkarılmış. session.timeout.ms değerini iş mantığının süresine göre ayarla. Uzun süren işlemler için max.poll.interval.ms kritik.

Offset commit lag artıyor: Consumer işleme kapasitesi mesaj hızına yetişemiyor. Çözümler: partition sayısını artır, consumer instance sayısını artır, işleme mantığını optimize et veya async işleme kullan.

BufferError: Local: Queue full: Producer buffer’ı doldu. buffer.memory artır ya da poll() çağrılarını daha sık yap. Aynı zamanda downstream’de bir sorun varsa back pressure olarak da görünür.

Sonuç

Python ile Kafka entegrasyonu göründüğünden biraz daha ince bir iş. confluent-kafka kütüphanesini seç, manuel offset commit’i alışkanlık haline getir, DLQ mekanizmasını her production uygulamasına ekle ve rebalance senaryolarını test et.

En çok gözden kaçan şey şu: producer’da flush() çağrısını unutmak. Consumer tarafında ise enable.auto.commit=True ile başlayıp sonra neden veri kaybettiklerini anlamaya çalışan ekiplere çok denk geldim. Bu iki noktaya dikkat edersen geri kalanı hata yaptıkça öğrenilecek şeyler.

Bir sonraki adım olarak Kafka Streams veya Faust gibi stream processing kütüphanelerine bakabilirsin. Ama önce buradaki temelleri sağlam oturt. Producer ve consumer’ı iyi anlamadan stream processing yazmak, temeli olmayan bina inşa etmeye benziyor.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir