Python ile Kafka Producer ve Consumer Yazımı
Kafka cluster’ı ayağa kaldırdın, topic’leri oluşturdun, her şey güzel görünüyor. Peki ya uygulamandan Kafka’ya mesaj gönderip almak? İşte burada Python devreye giriyor ve çoğu zaman insanlar basit “hello world” örneklerinin ötesine geçemiyor. Bu yazıda gerçek dünyada karşılaşacağın senaryolar üzerinden, production-ready producer ve consumer yazmanın inceliklerini aktaracağım.
Neden confluent-kafka-python?
Python dünyasında Kafka için iki ana kütüphane var: kafka-python ve confluent-kafka-python. Pek çok kaynakta kafka-python öneriliyor çünkü pure Python ve kurulumu kolay. Ancak production ortamında bunu kullandığında performans sorunlarıyla karşılaşıyorsun.
confluent-kafka-python arka planda librdkafka kullanıyor. Bu C kütüphanesi, throughput ve latency açısından ciddi fark yaratıyor. Özellikle saniyede binlerce mesaj gönderdiğinde bu farkı net görüyorsun. Ben her iki kütüphaneyi de production’da kullandım, sonunda tercihim confluent-kafka oldu.
Kurulum için:
pip install confluent-kafka
Eğer Avro schema desteğine ihtiyaç duyuyorsan Schema Registry için:
pip install confluent-kafka[avro]
Temel Producer Yazımı
Önce en basit haliyle producer’ı görelim, sonra üzerine ekleyeceğiz:
from confluent_kafka import Producer
import json
conf = {
'bootstrap.servers': 'localhost:9092',
'client.id': 'python-producer-v1'
}
producer = Producer(conf)
def delivery_callback(err, msg):
if err:
print(f'Mesaj gonderilemedi: {err}')
else:
print(f'Mesaj gonderildi | Topic: {msg.topic()} | '
f'Partition: {msg.partition()} | Offset: {msg.offset()}')
veri = {
'kullanici_id': 1234,
'eylem': 'satin_alma',
'urun_id': 'SKU-9876',
'timestamp': '2024-01-15T10:30:00Z'
}
producer.produce(
topic='kullanici-eylemleri',
key=str(veri['kullanici_id']),
value=json.dumps(veri),
callback=delivery_callback
)
producer.flush()
Burada dikkat edilmesi gereken birkaç nokta var. producer.flush() çağrısını unutursan buffer’daki mesajlar gönderilmeden program sonlanır. delivery_callback olmadan neyin gittiğini, neyin gitmediğini bilemezsin.
Production-Ready Producer
Gerçek hayatta bu kadar basit olmuyor. Hata yönetimi, retry mekanizması, batch gönderim gibi şeyler lazım:
from confluent_kafka import Producer, KafkaException
import json
import logging
import time
from typing import Optional, Callable
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class KafkaProducer:
def __init__(self, bootstrap_servers: str, **ek_konfig):
self.conf = {
'bootstrap.servers': bootstrap_servers,
'client.id': 'uygulama-producer',
# Acknowledgment: 0=fire-forget, 1=leader, all=tum replikalar
'acks': 'all',
# Maksimum retry sayisi
'retries': 5,
'retry.backoff.ms': 300,
# Batch boyutu (byte)
'batch.size': 16384,
# Mesajlari batch'lemek icin bekleme suresi (ms)
'linger.ms': 10,
# Buffer toplam boyutu
'buffer.memory': 33554432,
# Idempotent producer - duplicate mesaj engellemek icin
'enable.idempotence': True,
}
self.conf.update(ek_konfig)
self.producer = Producer(self.conf)
self._pending_count = 0
def _delivery_callback(self, err, msg):
self._pending_count -= 1
if err:
logger.error(
f'Delivery hatasi | Topic: {msg.topic()} | '
f'Hata: {err.str()} | Kod: {err.code()}'
)
else:
logger.debug(
f'OK | Topic: {msg.topic()} | '
f'Partition: {msg.partition()} | Offset: {msg.offset()}'
)
def gonder(self, topic: str, value: dict,
key: Optional[str] = None,
headers: Optional[dict] = None) -> bool:
max_deneme = 3
for deneme in range(max_deneme):
try:
# Poll ile onceki mesajlarin callback'lerini isle
self.producer.poll(0)
self.producer.produce(
topic=topic,
key=key.encode('utf-8') if key else None,
value=json.dumps(value, ensure_ascii=False).encode('utf-8'),
headers=headers,
callback=self._delivery_callback
)
self._pending_count += 1
return True
except BufferError:
logger.warning(f'Buffer dolu, bekleniyor... (deneme {deneme+1})')
self.producer.poll(1)
time.sleep(0.1)
except KafkaException as e:
logger.error(f'Kafka hatasi: {e}')
if deneme == max_deneme - 1:
raise
time.sleep(0.5 * (deneme + 1))
return False
def flush(self, timeout: float = 30.0):
kalan = self.producer.flush(timeout)
if kalan > 0:
logger.warning(f'{kalan} mesaj gonderilemedi, timeout asildi')
return kalan
def __enter__(self):
return self
def __exit__(self, *args):
self.flush()
# Kullanim
if __name__ == '__main__':
with KafkaProducer('localhost:9092') as producer:
for i in range(100):
producer.gonder(
topic='siparis-olaylari',
key=f'musteri-{i % 10}',
value={'siparis_no': i, 'durum': 'olusturuldu'},
headers={'kaynak': 'web-api', 'versiyon': 'v2'}
)
enable.idempotence ayarı özellikle önemli. Producer retry yaptığında aynı mesajı iki kez yazmasını engelliyor. acks=all ile birlikte kullanmak zorundasın, aksi halde exception alırsın.
Temel Consumer Yazımı
Consumer tarafı producer’a göre biraz daha karmaşık çünkü offset yönetimi, partition assignment, group coordination gibi konuları düşünmek lazım:
from confluent_kafka import Consumer, KafkaError, KafkaException
import json
import signal
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'siparis-isleme-grubu',
# earliest: en bastan oku, latest: sadece yeni mesajlari oku
'auto.offset.reset': 'earliest',
# Otomatik commit - dikkatli kullan!
'enable.auto.commit': True,
'auto.commit.interval.ms': 5000,
}
consumer = Consumer(conf)
consumer.subscribe(['siparis-olaylari'])
calis = True
def kapat(signum, frame):
global calis
logger.info('Kapatma sinyali alindi...')
calis = False
signal.signal(signal.SIGINT, kapat)
signal.signal(signal.SIGTERM, kapat)
try:
while calis:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
logger.info(f'Partition sonu: {msg.partition()} @ {msg.offset()}')
else:
raise KafkaException(msg.error())
else:
veri = json.loads(msg.value().decode('utf-8'))
logger.info(f'Mesaj alindi: {veri}')
finally:
consumer.close()
logger.info('Consumer kapatildi')
Manuel Offset Yönetimi
enable.auto.commit=True kullanışlı görünse de tehlikeli. Mesajı aldın, işlemeye başladın, tam bu sırada uygulaman çöktü. Kafka o mesajı commit’lenmiş sayıyor ve bir daha göndermeyecek. Veri kaybı.
Manuel commit ile bunu çözüyorsun:
from confluent_kafka import Consumer, KafkaError, TopicPartition
import json
import logging
logger = logging.getLogger(__name__)
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'guclu-consumer-grubu',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False, # Manuel commit
'max.poll.interval.ms': 300000, # 5 dakika - uzun islemler icin
'session.timeout.ms': 30000,
}
consumer = Consumer(conf)
consumer.subscribe(['siparis-olaylari'])
def mesaji_isle(veri: dict) -> bool:
"""Gercek is mantigi burada. True: basarili, False: basarisiz"""
try:
# Veritabani kaydi, API cagrisi vs.
logger.info(f"Siparis isleniyor: {veri.get('siparis_no')}")
# Simule ediyoruz
return True
except Exception as e:
logger.error(f'Isleme hatasi: {e}')
return False
try:
while True:
mesajlar = consumer.consume(num_messages=50, timeout=1.0)
for msg in mesajlar:
if msg.error():
if msg.error().code() != KafkaError._PARTITION_EOF:
logger.error(f'Consumer hatasi: {msg.error()}')
continue
try:
veri = json.loads(msg.value().decode('utf-8'))
basarili = mesaji_isle(veri)
if basarili:
# Sadece basarili islenen mesajlari commit et
consumer.commit(message=msg, asynchronous=False)
else:
# Hata durumunda ne yapacagini sen karar ver
# DLQ'ya gonder, log'la, alarm ver...
logger.warning(
f'Mesaj islenemedi, offset: {msg.offset()} '
f'partition: {msg.partition()}'
)
except json.JSONDecodeError as e:
logger.error(f'JSON parse hatasi: {e} | Mesaj: {msg.value()}')
# Bozuk mesaji commit et, sonsuza kadar takili kalmasin
consumer.commit(message=msg, asynchronous=False)
finally:
consumer.close()
asynchronous=False commit performansı düşürür ama at-least-once garantisi ister. Eğer exactly-once istiyorsan işi Kafka Transactions’a getirmen gerekiyor ki bu ayrı bir konu.
Partition Assignment ve Rebalance Yönetimi
Consumer group’larda partition rebalance olduğunda dikkatli olmak lazım. Rebalance sırasında commit yapılmamış mesajlar kaybolabilir:
from confluent_kafka import Consumer, TopicPartition
import logging
logger = logging.getLogger(__name__)
class RebalanceYoneticisi:
def __init__(self, consumer):
self.consumer = consumer
self.islenmekte = {} # Partition -> islenmekte olan mesajlar
def atandi(self, consumer, partitions):
logger.info(f'Partition atandı: {[p.partition for p in partitions]}')
for p in partitions:
self.islenmekte[p.partition] = []
def geri_alindi(self, consumer, partitions):
logger.warning(f'Partition geri alindi: {[p.partition for p in partitions]}')
# Commit yapilmamis offsetleri geri al
for p in partitions:
if p.partition in self.islenmekte:
bekleyen = self.islenmekte.pop(p.partition, [])
if bekleyen:
logger.warning(f'Partition {p.partition}: {len(bekleyen)} mesaj commit edilemedi')
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'rebalance-farkindali-grup',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
# Rebalance protokolu: cooperative daha az kesinti yaratiyor
'partition.assignment.strategy': 'cooperative-sticky',
}
consumer = Consumer(conf)
rebalance = RebalanceYoneticisi(consumer)
consumer.subscribe(
['siparis-olaylari'],
on_assign=rebalance.atandi,
on_revoke=rebalance.geri_alindi
)
cooperative-sticky stratejisi özellikle büyük consumer group’larda rebalance süresini ciddi ölçüde kısaltıyor. Eski eager protokolde tüm partition’lar rebalance başında bırakılıp yeniden atanıyordu. cooperative-sticky‘de sadece taşınması gereken partition’lar el değiştiriyor.
Dead Letter Queue Implementasyonu
Production’da her zaman şu soruyla karşılaşırsın: işlenemeyen mesajı ne yapacaksın? Onu tekrar tekrar deneyip consumer’ı bloke etmek yerine ayrı bir DLQ topic’ine gönder:
from confluent_kafka import Consumer, Producer, KafkaError
import json
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
class DLQDestekliConsumer:
def __init__(self, bootstrap_servers: str, group_id: str,
kaynak_topic: str, maks_deneme: int = 3):
self.kaynak_topic = kaynak_topic
self.dlq_topic = f'{kaynak_topic}.dlq'
self.maks_deneme = maks_deneme
self.consumer = Consumer({
'bootstrap.servers': bootstrap_servers,
'group.id': group_id,
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
})
self.dlq_producer = Producer({
'bootstrap.servers': bootstrap_servers,
'acks': 'all',
})
self.consumer.subscribe([kaynak_topic])
def dlq_gonder(self, msg, hata_nedeni: str, deneme_sayisi: int):
dlq_veri = {
'orijinal_deger': msg.value().decode('utf-8'),
'orijinal_topic': msg.topic(),
'orijinal_partition': msg.partition(),
'orijinal_offset': msg.offset(),
'hata_nedeni': hata_nedeni,
'deneme_sayisi': deneme_sayisi,
'dlq_zamani': datetime.utcnow().isoformat(),
}
self.dlq_producer.produce(
topic=self.dlq_topic,
key=msg.key(),
value=json.dumps(dlq_veri, ensure_ascii=False).encode('utf-8'),
headers={
'kaynak-topic': self.kaynak_topic,
'hata-tipi': type(hata_nedeni).__name__
}
)
self.dlq_producer.flush(timeout=5.0)
logger.warning(f'Mesaj DLQ'ya gonderildi: {self.dlq_topic} | Sebep: {hata_nedeni}')
def calistir(self):
deneme_sayaci = {}
try:
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() != KafkaError._PARTITION_EOF:
logger.error(f'Consumer hatasi: {msg.error()}')
continue
mesaj_kimlik = f'{msg.partition()}-{msg.offset()}'
deneme = deneme_sayaci.get(mesaj_kimlik, 0) + 1
deneme_sayaci[mesaj_kimlik] = deneme
try:
veri = json.loads(msg.value().decode('utf-8'))
# Is mantigi
self._isle(veri)
self.consumer.commit(message=msg, asynchronous=False)
deneme_sayaci.pop(mesaj_kimlik, None)
except Exception as e:
if deneme >= self.maks_deneme:
logger.error(f'Maksimum deneme asildi, DLQ'ya gonderiliyor')
self.dlq_gonder(msg, str(e), deneme)
self.consumer.commit(message=msg, asynchronous=False)
deneme_sayaci.pop(mesaj_kimlik, None)
else:
logger.warning(f'Isleme hatasi (deneme {deneme}/{self.maks_deneme}): {e}')
finally:
self.consumer.close()
def _isle(self, veri: dict):
# Uygulamaya ozel is mantigi buraya
pass
Performans Testinde Gerçek Sayılar
Konfigürasyon ayarları teoride iyi görünüyor ama test etmeden olmaz. İşte hızlı bir throughput testi:
from confluent_kafka import Producer
import json
import time
import uuid
conf = {
'bootstrap.servers': 'localhost:9092',
'acks': '1', # Sadece leader ack - daha hizli
'batch.size': 65536, # 64KB batch
'linger.ms': 5,
'compression.type': 'snappy', # CPU vs bandwidth tradeoff
'buffer.memory': 67108864, # 64MB
}
producer = Producer(conf)
mesaj_sayisi = 10000
gonderilen = [0]
baslangic = time.time()
def callback(err, msg):
if not err:
gonderilen[0] += 1
test_verisi = {
'id': str(uuid.uuid4()),
'veri': 'x' * 100, # ~100 byte payload
}
for i in range(mesaj_sayisi):
producer.produce(
topic='performans-test',
value=json.dumps(test_verisi).encode('utf-8'),
callback=callback
)
if i % 1000 == 0:
producer.poll(0)
producer.flush()
sure = time.time() - baslangic
print(f'Gonderilen: {gonderilen[0]} mesaj')
print(f'Sure: {sure:.2f} saniye')
print(f'Throughput: {gonderilen[0]/sure:.0f} mesaj/saniye')
Kendi testlerimde linger.ms=0 ile linger.ms=10 arasındaki fark neredeyse 3 kat throughput farkı olarak görünüyor. Küçük ayarlar büyük farklar yaratıyor.
Yaygın Sorunlar ve Çözümler
GroupCoordinatorNotAvailable hatası: Kafka broker’a ulaşılabiliyor ama coordinator henüz hazır değil. Consumer’ı biraz bekletip tekrar bağlan. Genellikle broker yeni başlatıldığında görülür.
UNKNOWN_MEMBER_ID hatası: Session timeout dolmuş ve consumer group’tan çıkarılmış. session.timeout.ms değerini iş mantığının süresine göre ayarla. Uzun süren işlemler için max.poll.interval.ms kritik.
Offset commit lag artıyor: Consumer işleme kapasitesi mesaj hızına yetişemiyor. Çözümler: partition sayısını artır, consumer instance sayısını artır, işleme mantığını optimize et veya async işleme kullan.
BufferError: Local: Queue full: Producer buffer’ı doldu. buffer.memory artır ya da poll() çağrılarını daha sık yap. Aynı zamanda downstream’de bir sorun varsa back pressure olarak da görünür.
Sonuç
Python ile Kafka entegrasyonu göründüğünden biraz daha ince bir iş. confluent-kafka kütüphanesini seç, manuel offset commit’i alışkanlık haline getir, DLQ mekanizmasını her production uygulamasına ekle ve rebalance senaryolarını test et.
En çok gözden kaçan şey şu: producer’da flush() çağrısını unutmak. Consumer tarafında ise enable.auto.commit=True ile başlayıp sonra neden veri kaybettiklerini anlamaya çalışan ekiplere çok denk geldim. Bu iki noktaya dikkat edersen geri kalanı hata yaptıkça öğrenilecek şeyler.
Bir sonraki adım olarak Kafka Streams veya Faust gibi stream processing kütüphanelerine bakabilirsin. Ama önce buradaki temelleri sağlam oturt. Producer ve consumer’ı iyi anlamadan stream processing yazmak, temeli olmayan bina inşa etmeye benziyor.
