Mesaj Kuyruğu Mimarisinde Temel Tasarım Desenleri
Dağıtık sistemlerde çalışmaya başladığınızda fark ettiğiniz ilk şeylerden biri şu olur: servisler birbirine ne kadar sıkı bağlıysa, o kadar çok baş ağrısı yaşarsınız. Biri düşer, diğeri de düşer. Biri yavaşlar, diğeri de kuyrukta bekler. İşte mesaj kuyruğu mimarileri tam bu noktada devreye giriyor. Ama “kuyruk koy araya, tamam” demek yetmiyor. Hangi deseni nerede kullanacağınızı bilmezseniz, sorunu çözmek yerine yeni sorunlar yaratırsınız. Yıllar içinde farklı ölçeklerde sistemlerde çalışırken öğrendiğim tasarım desenlerini bugün sizinle paylaşacağım.
Mesaj Kuyruğu Neden Bu Kadar Önemli?
Önce temel motivasyonu netleştirelim. Bir e-ticaret sisteminde sipariş geldiğinde ne olur? Sipariş veritabanına yazılır, stok güncellenir, ödeme işlenir, kargo firmasına bildirim gider, müşteriye e-posta atılır, muhasebe sistemine kayıt düşülür. Bunların hepsini senkron yaparsanız, kargo API’si 3 saniye cevap vermediğinde müşteri sipariş ekranında beklemeye devam eder. Üstelik o 3 saniyede başka bir şey patlarsa tüm işlemi geri almanız gerekir.
Mesaj kuyruğu bu adımları birbirinden ayırır. “Sipariş alındı” mesajı kuyruğa düşer, her servis kendi hızında bu mesajı işler. Müşteri ekranda beklemiyor, sistemler birbirinden bağımsız ölçekleniyor.
Ama işte asıl soru şu: Bu mesajları nasıl organize edeceksiniz?
Point-to-Point (Nokta Noktaya) Deseni
En basit desen bu. Bir üretici mesajı kuyruğa atar, bir tüketici alır ve işler. Mesaj bir kez tüketilir, kuyruktan silinir.
RabbitMQ ile basit bir örnek gösterelim:
# RabbitMQ kurulumu (Ubuntu)
apt-get install -y rabbitmq-server
systemctl start rabbitmq-server
systemctl enable rabbitmq-server
# Yönetim panelini aktif et
rabbitmq-plugins enable rabbitmq_management
# Kullanıcı oluştur
rabbitmqctl add_user sysadmin guclu_sifre
rabbitmqctl set_user_tags sysadmin administrator
rabbitmqctl set_permissions -p / sysadmin ".*" ".*" ".*"
Python ile üretici tarafını yazalım:
# pika kütüphanesini kur
pip install pika
# producer.py içeriği
cat << 'EOF' > /opt/queue_demo/producer.py
import pika
import json
import datetime
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='localhost',
credentials=pika.PlainCredentials('sysadmin', 'guclu_sifre')
)
)
channel = connection.channel()
# Kuyruğu durable (kalıcı) yap
channel.queue_declare(queue='siparis_kuyrugu', durable=True)
siparis = {
'id': '12345',
'musteri': 'Ahmet Yilmaz',
'tutar': 450.00,
'tarih': str(datetime.datetime.now())
}
channel.basic_publish(
exchange='',
routing_key='siparis_kuyrugu',
body=json.dumps(siparis),
properties=pika.BasicProperties(
delivery_mode=2 # Mesajı diske yaz, kalıcı olsun
)
)
print(f"Siparis kuyruğa eklendi: {siparis['id']}")
connection.close()
EOF
Bu desenin ideal olduğu durumlar:
- İş yükü dağıtımı: Birden fazla consumer aynı kuyruktan alırsa RabbitMQ round-robin dağıtır
- Sıralı işlem gerektiren görevler: Video dönüştürme, rapor oluşturma
- Yeniden işleme (retry) mantığı: Mesaj işlenemezse dead letter queue’ya düşer
Point-to-point’in en büyük tuzağı şu: Kuyruğun consumer olmadan büyümesine izin vermek. Monitoring şart.
Publish-Subscribe (Yayın-Abone) Deseni
Bir mesajın birden fazla servise ulaşması gerektiğinde publish-subscribe devreye girer. Üretici mesajı bir “topic” veya “exchange”e gönderir, abone olan herkes alır.
Apache Kafka ile bu deseni kuralım:
# Kafka ve Zookeeper servislerini başlat (docker-compose)
cat << 'EOF' > /opt/kafka/docker-compose.yml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.4.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_LOG_RETENTION_HOURS: 168
EOF
docker-compose -f /opt/kafka/docker-compose.yml up -d
# Topic oluştur
docker exec -it kafka_kafka_1 kafka-topics --create
--topic kullanici-olaylari
--bootstrap-server localhost:9092
--partitions 3
--replication-factor 1
Pub/Sub’ın gücü şurada: Kullanıcı kayıt olduğunda kullanici.kayit eventi yayınlanır. E-posta servisi, bildirim servisi, CRM sistemi, analitik sistemi hepsi bu eventi bağımsız olarak tüketir. Birini kapatıp açtığınızda diğerleri etkilenmez.
# Consumer group kavramı - aynı group içindeki consumer'lar yükü paylaşır
# Farklı group'lar aynı mesajı bağımsız okur
# Bildirim servisi consumer group
kafka-console-consumer --bootstrap-server localhost:9092
--topic kullanici-olaylari
--group bildirim-servisi
--from-beginning
# Analitik servisi consumer group (aynı mesajları AYRI okur)
kafka-console-consumer --bootstrap-server localhost:9092
--topic kullanici-olaylari
--group analitik-servisi
--from-beginning
Pub/Sub ile ilgili bir deneyimimi paylaşayım: Bir projede log toplama sistemi kuruyorduk. Her uygulama sunucusu log mesajlarını Kafka’ya yazıyor, Elasticsearch consumer logları indexliyor, ayrı bir consumer alarm servisi kritik hataları yakalıyordu. Sonradan “bu logları da S3’e arşivleyelim” ihtiyacı çıktı. Tek yapmamız gereken yeni bir consumer group eklemekti. Mevcut sisteme tek satır dokunmadık.
Request-Reply (İstek-Yanıt) Deseni
Asenkron sistemlerde bazen senkron davranış simüle etmeniz gerekir. Bir servis mesaj gönderir ve yanıt bekler. Bu durumda “correlation ID” ve “reply-to” kavramları devreye girer.
# RabbitMQ ile request-reply örneği
cat << 'EOF' > /opt/queue_demo/rpc_client.py
import pika
import uuid
import json
class RpcClient:
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
# Geçici yanıt kuyruğu oluştur
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True
)
self.response = None
self.corr_id = None
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = json.loads(body)
def call(self, istek):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='stok_sorgulama',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=json.dumps(istek)
)
# Yanıt gelene kadar bekle (timeout ekleyin!)
self.connection.process_data_events(time_limit=5)
return self.response
rpc = RpcClient()
sonuc = rpc.call({'urun_id': 'ABC123'})
print(f"Stok durumu: {sonuc}")
EOF
Bu deseni dikkatli kullanın. Asenkron mimariye “senkron bir şey ekleyeyim” dediğinizde, aslında o noktada asenkronluğun avantajlarından vazgeçiyorsunuz. Gerçekten gerekli değilse kullanmayın. Ama kullanıcıya “sipariş sayısını hesapla” gibi anlık sonuç döndürmeniz gereken durumlarda kaçınılmaz.
Dead Letter Queue (Ölü Mektup Kuyruğu) Deseni
Bu desen bir güvenlik ağıdır. İşlenemeyen mesajların kaybolması yerine ayrı bir kuyruğa düşmesi sağlanır. Üretim ortamında bu olmadan gece uyuyamazsınız.
# RabbitMQ'da DLQ konfigürasyonu
rabbitmqadmin declare exchange name=dlx_exchange type=direct
rabbitmqadmin declare queue name=dlq_siparis
durable=true
rabbitmqadmin declare binding
source=dlx_exchange
destination=dlq_siparis
routing_key=siparis_hata
# Ana kuyruğu DLX ile tanımla
rabbitmqadmin declare queue name=siparis_kuyrugu
durable=true
arguments='{"x-dead-letter-exchange": "dlx_exchange",
"x-dead-letter-routing-key": "siparis_hata",
"x-message-ttl": 86400000,
"x-max-length": 10000}'
DLQ ile ilgili kritik bir nokta: DLQ’yu izlemek ve periyodik olarak işlemek gerekir. Sadece “düşer birikir” deyip bırakmayın.
# DLQ monitoring scripti - cron ile her 15 dakikada çalıştır
cat << 'EOF' > /opt/scripts/dlq_monitor.sh
#!/bin/bash
DLQ_COUNT=$(rabbitmqctl list_queues name messages |
grep dlq_siparis | awk '{print $2}')
THRESHOLD=100
if [ "$DLQ_COUNT" -gt "$THRESHOLD" ]; then
echo "UYARI: DLQ'da $DLQ_COUNT mesaj birikti!" |
mail -s "DLQ Alarm" [email protected]
# Slack webhook ile de bildirim gönder
curl -s -X POST -H 'Content-type: application/json'
--data "{"text":"DLQ alarm: $DLQ_COUNT mesaj işlenemedi"}"
https://hooks.slack.com/services/TOKEN
fi
EOF
chmod +x /opt/scripts/dlq_monitor.sh
echo "*/15 * * * * root /opt/scripts/dlq_monitor.sh" >> /etc/cron.d/dlq-monitor
Competing Consumers (Rekabet Eden Tüketiciler) Deseni
Bir kuyruğa birden fazla consumer bağlanarak işlem hızını artırırsınız. RabbitMQ’da bu varsayılan davranıştır ama doğru yapılandırılması önemlidir.
# Prefetch count ayarı kritik öneme sahip
# Consumer, ack göndermeden kaç mesaj alabilir?
# Düşük değer: Consumer boşta bekler
# Yüksek değer: Bir consumer yığılırken diğeri boşta kalır
cat << 'EOF' > /opt/queue_demo/consumer_config.py
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# Her consumer aynı anda sadece 1 mesaj işlesin
# İşi bitirmeden yeni mesaj almaz -> adil dağıtım
channel.basic_qos(prefetch_count=1)
def callback(ch, method, properties, body):
import json
import time
siparis = json.loads(body)
print(f"İşleniyor: {siparis['id']}")
# İşlem yap
time.sleep(1) # Simüle edilen işlem
# Başarılı: mesajı onayla
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(
queue='siparis_kuyrugu',
on_message_callback=callback
)
print("Consumer başladı, Ctrl+C ile dur")
channel.start_consuming()
EOF
Bu desende dikkat edilecek husus: Consumer’larınızı idempotent yapın. Yani aynı mesaj iki kez işlenirse sonuç değişmemeli. Ağ kesintisinde mesaj tekrar teslim edilebilir.
Saga Deseni ve Choreography
Mikroservislerde dağıtık transaction yönetimi için Saga deseni kullanılır. İki yaklaşım var: Orchestration (merkezi koordinatör) ve Choreography (her servis kendi koreografisini bilir).
Choreography tabanlı Saga’da her servis başarı veya hata eventi yayınlar:
# Kafka ile Saga choreography topic'leri
kafka-topics --create
--topic siparis-olusturuldu
--bootstrap-server localhost:9092
--partitions 3
--replication-factor 1
kafka-topics --create
--topic stok-rezerve-edildi
--bootstrap-server localhost:9092
--partitions 3
--replication-factor 1
kafka-topics --create
--topic odeme-tamamlandi
--bootstrap-server localhost:9092
--partitions 3
--replication-factor 1
kafka-topics --create
--topic siparis-iptal-edildi
--bootstrap-server localhost:9092
--partitions 3
--replication-factor 1
# Topic listesini doğrula
kafka-topics --list --bootstrap-server localhost:9092
Saga’nın akışı şöyle işler:
- Sipariş Servisi: Sipariş oluşturur,
siparis-olusturulduyayınlar - Stok Servisi:
siparis-olusturuldudinler, stoğu reserve eder,stok-rezerve-edildiyayınlar. Stok yoksastok-yetersizyayınlar - Ödeme Servisi:
stok-rezerve-edildidinler, ödemi alır,odeme-tamamlandiyayınlar. Ödeme başarısız olursaodeme-basarisizyayınlar - Stok Servisi:
odeme-basarisizdinler, rezervasyonu geri alır
Bu yaklaşımın güzelliği: Hiçbir servis diğerini doğrudan çağırmaz. Dezavantajı: Akışı takip etmek zorlaşır, distributed tracing şart olur.
Circuit Breaker ile Entegrasyon
Mesaj kuyruğu sistemlerinde consumer’ların dış servislere bağlandığı durumlarda circuit breaker pattern’ı kullanmak hayat kurtarır.
# Consumer'ın dış API çağrısı yapması ve circuit breaker
# pybreaker kütüphanesi ile örnek
pip install pybreaker pika
cat << 'EOF' > /opt/queue_demo/circuit_breaker_consumer.py
import pika
import pybreaker
import requests
import json
# Circuit breaker: 5 hata olursa 60 saniye açık kal
kargo_breaker = pybreaker.CircuitBreaker(
fail_max=5,
reset_timeout=60
)
@kargo_breaker
def kargo_api_cagir(siparis_id):
response = requests.post(
'https://kargo-api.sirket.com/siparis',
json={'siparis_id': siparis_id},
timeout=3
)
response.raise_for_status()
return response.json()
def mesaj_isle(ch, method, properties, body):
siparis = json.loads(body)
try:
sonuc = kargo_api_cagir(siparis['id'])
print(f"Kargo oluşturuldu: {sonuc}")
ch.basic_ack(delivery_tag=method.delivery_tag)
except pybreaker.CircuitBreakerError:
# Devre açık, mesajı geri koyma, DLQ'ya yönlendir
print("Kargo API devresi açık, mesaj DLQ'ya gidiyor")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
except Exception as e:
print(f"Hata: {e}, mesaj tekrar kuyruğa alınıyor")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
EOF
Partition ve Ordering Garantileri
Kafka kullanıyorsanız sıralama garantisi partition bazındadır. Aynı kullanıcıya ait tüm eventlerin sıralı işlenmesi gerekiyorsa, aynı partition key’i kullanmalısınız.
# Kafka'da partition key ile mesaj gönderme
# kullanici_id partition key olarak kullanılır
# Aynı kullanıcının tüm eventleri aynı partition'a düşer
kafka-console-producer
--broker-list localhost:9092
--topic kullanici-olaylari
--property "key.separator=:"
--property "parse.key=true" << 'EOF'
user_123:{"event":"giris","zaman":"2024-01-15T10:00:00"}
user_456:{"event":"giris","zaman":"2024-01-15T10:00:01"}
user_123:{"event":"satin_alma","zaman":"2024-01-15T10:05:00"}
user_123:{"event":"cikis","zaman":"2024-01-15T10:30:00"}
EOF
# Partition dağılımını kontrol et
kafka-consumer-groups
--bootstrap-server localhost:9092
--describe
--group analitik-servisi
Bu noktada çok yaygın bir hata görüyorum: Partition sayısını sonradan artırmak. Kafka’da partition sayısı artırılabilir ama azaltılamaz, ve artırdığınızda mevcut key-to-partition eşleşmesi bozulur. Baştan doğru partition sayısını belirleyin.
Backpressure Yönetimi
Consumer’lar üretici kadar hızlı işleyemediğinde kuyruk büyür. Backpressure mekanizması bu durumu kontrol altına alır.
# Kafka consumer lag monitoring
cat << 'EOF' > /opt/scripts/consumer_lag_check.sh
#!/bin/bash
# Consumer lag belirli eşiği geçerse uyar
LAG=$(kafka-consumer-groups
--bootstrap-server localhost:9092
--describe
--group siparis-isleme-grubu 2>/dev/null |
awk 'NR>1 && $6 ~ /^[0-9]+$/ {sum += $6} END {print sum}')
echo "Toplam consumer lag: $LAG mesaj"
if [ "$LAG" -gt 5000 ]; then
echo "KRITIK: Consumer lag çok yüksek ($LAG), yeni consumer ekleyin!" |
mail -s "Kafka Lag Alarm" [email protected]
elif [ "$LAG" -gt 1000 ]; then
echo "UYARI: Consumer lag yüksek ($LAG mesaj)"
fi
# Metric'i Prometheus'a push et
curl -s --data-binary "kafka_consumer_lag{group="siparis-isleme"} $LAG"
http://prometheus-pushgateway:9091/metrics/job/kafka_monitor
EOF
chmod +x /opt/scripts/consumer_lag_check.sh
echo "*/5 * * * * root /opt/scripts/consumer_lag_check.sh" >> /etc/cron.d/kafka-monitor
Backpressure için pratik önlemler:
- Consumer sayısını artırın: Kafka’da partition sayısına kadar consumer ekleyebilirsiniz
- Batch processing: Tek tek değil, toplu işleyin
- Rate limiting: Upstream’e “yavaşla” sinyali gönderin
- Priority queue: Kritik mesajları ayrı kuyruğa alın
Sonuç
Mesaj kuyruğu mimarisi tasarlarken “hangi deseni kullanayım” sorusundan önce “ne garantisi istiyorum” sorusunu sormalısınız.
Birden fazla servise aynı mesajı iletmeniz mi gerekiyor? Pub/Sub. Tek bir işçi işlemesi mi? Point-to-point. Yanıt beklemeniz mi gerekiyor ama senkron servis kullanamıyor musunuz? Request-reply. Dağıtık transaction mı yönetiyorsunuz? Saga.
Bu desenleri birleştirmek de mümkün ve çoğu zaman gerekli. Örneğin sipariş sisteminde hem Saga hem DLQ hem de competing consumers birlikte kullanıyorsunuz.
Son olarak şunu söyleyeyim: En iyi mesaj kuyruğu mimarisi, ekibinizin anlayabildiği, monitor edebildiği ve sorun çıktığında müdahale edebildiği mimaridir. Her şeyi Kafka ile yapmak zorunda değilsiniz. Bazı senaryolar için Redis Streams yeterlidir, bazıları için BullMQ bile yeterlidir. Araç değil, ihtiyaç odaklı düşünün.
Gece 3’te pagerduty çaldığında ne yapacağınızı bilmiyorsanız, o mimari henüz hazır değildir.
