Mesaj Kuyruğu Mimarisinde Temel Tasarım Desenleri

Dağıtık sistemlerde çalışmaya başladığınızda fark ettiğiniz ilk şeylerden biri şu olur: servisler birbirine ne kadar sıkı bağlıysa, o kadar çok baş ağrısı yaşarsınız. Biri düşer, diğeri de düşer. Biri yavaşlar, diğeri de kuyrukta bekler. İşte mesaj kuyruğu mimarileri tam bu noktada devreye giriyor. Ama “kuyruk koy araya, tamam” demek yetmiyor. Hangi deseni nerede kullanacağınızı bilmezseniz, sorunu çözmek yerine yeni sorunlar yaratırsınız. Yıllar içinde farklı ölçeklerde sistemlerde çalışırken öğrendiğim tasarım desenlerini bugün sizinle paylaşacağım.

Mesaj Kuyruğu Neden Bu Kadar Önemli?

Önce temel motivasyonu netleştirelim. Bir e-ticaret sisteminde sipariş geldiğinde ne olur? Sipariş veritabanına yazılır, stok güncellenir, ödeme işlenir, kargo firmasına bildirim gider, müşteriye e-posta atılır, muhasebe sistemine kayıt düşülür. Bunların hepsini senkron yaparsanız, kargo API’si 3 saniye cevap vermediğinde müşteri sipariş ekranında beklemeye devam eder. Üstelik o 3 saniyede başka bir şey patlarsa tüm işlemi geri almanız gerekir.

Mesaj kuyruğu bu adımları birbirinden ayırır. “Sipariş alındı” mesajı kuyruğa düşer, her servis kendi hızında bu mesajı işler. Müşteri ekranda beklemiyor, sistemler birbirinden bağımsız ölçekleniyor.

Ama işte asıl soru şu: Bu mesajları nasıl organize edeceksiniz?

Point-to-Point (Nokta Noktaya) Deseni

En basit desen bu. Bir üretici mesajı kuyruğa atar, bir tüketici alır ve işler. Mesaj bir kez tüketilir, kuyruktan silinir.

RabbitMQ ile basit bir örnek gösterelim:

# RabbitMQ kurulumu (Ubuntu)
apt-get install -y rabbitmq-server
systemctl start rabbitmq-server
systemctl enable rabbitmq-server

# Yönetim panelini aktif et
rabbitmq-plugins enable rabbitmq_management

# Kullanıcı oluştur
rabbitmqctl add_user sysadmin guclu_sifre
rabbitmqctl set_user_tags sysadmin administrator
rabbitmqctl set_permissions -p / sysadmin ".*" ".*" ".*"

Python ile üretici tarafını yazalım:

# pika kütüphanesini kur
pip install pika

# producer.py içeriği
cat << 'EOF' > /opt/queue_demo/producer.py
import pika
import json
import datetime

connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='localhost',
        credentials=pika.PlainCredentials('sysadmin', 'guclu_sifre')
    )
)
channel = connection.channel()

# Kuyruğu durable (kalıcı) yap
channel.queue_declare(queue='siparis_kuyrugu', durable=True)

siparis = {
    'id': '12345',
    'musteri': 'Ahmet Yilmaz',
    'tutar': 450.00,
    'tarih': str(datetime.datetime.now())
}

channel.basic_publish(
    exchange='',
    routing_key='siparis_kuyrugu',
    body=json.dumps(siparis),
    properties=pika.BasicProperties(
        delivery_mode=2  # Mesajı diske yaz, kalıcı olsun
    )
)
print(f"Siparis kuyruğa eklendi: {siparis['id']}")
connection.close()
EOF

Bu desenin ideal olduğu durumlar:

  • İş yükü dağıtımı: Birden fazla consumer aynı kuyruktan alırsa RabbitMQ round-robin dağıtır
  • Sıralı işlem gerektiren görevler: Video dönüştürme, rapor oluşturma
  • Yeniden işleme (retry) mantığı: Mesaj işlenemezse dead letter queue’ya düşer

Point-to-point’in en büyük tuzağı şu: Kuyruğun consumer olmadan büyümesine izin vermek. Monitoring şart.

Publish-Subscribe (Yayın-Abone) Deseni

Bir mesajın birden fazla servise ulaşması gerektiğinde publish-subscribe devreye girer. Üretici mesajı bir “topic” veya “exchange”e gönderir, abone olan herkes alır.

Apache Kafka ile bu deseni kuralım:

# Kafka ve Zookeeper servislerini başlat (docker-compose)
cat << 'EOF' > /opt/kafka/docker-compose.yml
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.4.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_LOG_RETENTION_HOURS: 168
EOF

docker-compose -f /opt/kafka/docker-compose.yml up -d

# Topic oluştur
docker exec -it kafka_kafka_1 kafka-topics --create 
  --topic kullanici-olaylari 
  --bootstrap-server localhost:9092 
  --partitions 3 
  --replication-factor 1

Pub/Sub’ın gücü şurada: Kullanıcı kayıt olduğunda kullanici.kayit eventi yayınlanır. E-posta servisi, bildirim servisi, CRM sistemi, analitik sistemi hepsi bu eventi bağımsız olarak tüketir. Birini kapatıp açtığınızda diğerleri etkilenmez.

# Consumer group kavramı - aynı group içindeki consumer'lar yükü paylaşır
# Farklı group'lar aynı mesajı bağımsız okur

# Bildirim servisi consumer group
kafka-console-consumer --bootstrap-server localhost:9092 
  --topic kullanici-olaylari 
  --group bildirim-servisi 
  --from-beginning

# Analitik servisi consumer group (aynı mesajları AYRI okur)
kafka-console-consumer --bootstrap-server localhost:9092 
  --topic kullanici-olaylari 
  --group analitik-servisi 
  --from-beginning

Pub/Sub ile ilgili bir deneyimimi paylaşayım: Bir projede log toplama sistemi kuruyorduk. Her uygulama sunucusu log mesajlarını Kafka’ya yazıyor, Elasticsearch consumer logları indexliyor, ayrı bir consumer alarm servisi kritik hataları yakalıyordu. Sonradan “bu logları da S3’e arşivleyelim” ihtiyacı çıktı. Tek yapmamız gereken yeni bir consumer group eklemekti. Mevcut sisteme tek satır dokunmadık.

Request-Reply (İstek-Yanıt) Deseni

Asenkron sistemlerde bazen senkron davranış simüle etmeniz gerekir. Bir servis mesaj gönderir ve yanıt bekler. Bu durumda “correlation ID” ve “reply-to” kavramları devreye girer.

# RabbitMQ ile request-reply örneği
cat << 'EOF' > /opt/queue_demo/rpc_client.py
import pika
import uuid
import json

class RpcClient:
    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost'))
        self.channel = self.connection.channel()
        
        # Geçici yanıt kuyruğu oluştur
        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue
        
        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True
        )
        self.response = None
        self.corr_id = None

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = json.loads(body)

    def call(self, istek):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        
        self.channel.basic_publish(
            exchange='',
            routing_key='stok_sorgulama',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=json.dumps(istek)
        )
        
        # Yanıt gelene kadar bekle (timeout ekleyin!)
        self.connection.process_data_events(time_limit=5)
        return self.response

rpc = RpcClient()
sonuc = rpc.call({'urun_id': 'ABC123'})
print(f"Stok durumu: {sonuc}")
EOF

Bu deseni dikkatli kullanın. Asenkron mimariye “senkron bir şey ekleyeyim” dediğinizde, aslında o noktada asenkronluğun avantajlarından vazgeçiyorsunuz. Gerçekten gerekli değilse kullanmayın. Ama kullanıcıya “sipariş sayısını hesapla” gibi anlık sonuç döndürmeniz gereken durumlarda kaçınılmaz.

Dead Letter Queue (Ölü Mektup Kuyruğu) Deseni

Bu desen bir güvenlik ağıdır. İşlenemeyen mesajların kaybolması yerine ayrı bir kuyruğa düşmesi sağlanır. Üretim ortamında bu olmadan gece uyuyamazsınız.

# RabbitMQ'da DLQ konfigürasyonu
rabbitmqadmin declare exchange name=dlx_exchange type=direct

rabbitmqadmin declare queue name=dlq_siparis 
  durable=true

rabbitmqadmin declare binding 
  source=dlx_exchange 
  destination=dlq_siparis 
  routing_key=siparis_hata

# Ana kuyruğu DLX ile tanımla
rabbitmqadmin declare queue name=siparis_kuyrugu 
  durable=true 
  arguments='{"x-dead-letter-exchange": "dlx_exchange", 
              "x-dead-letter-routing-key": "siparis_hata",
              "x-message-ttl": 86400000,
              "x-max-length": 10000}'

DLQ ile ilgili kritik bir nokta: DLQ’yu izlemek ve periyodik olarak işlemek gerekir. Sadece “düşer birikir” deyip bırakmayın.

# DLQ monitoring scripti - cron ile her 15 dakikada çalıştır
cat << 'EOF' > /opt/scripts/dlq_monitor.sh
#!/bin/bash
DLQ_COUNT=$(rabbitmqctl list_queues name messages | 
  grep dlq_siparis | awk '{print $2}')

THRESHOLD=100

if [ "$DLQ_COUNT" -gt "$THRESHOLD" ]; then
    echo "UYARI: DLQ'da $DLQ_COUNT mesaj birikti!" | 
    mail -s "DLQ Alarm" [email protected]
    
    # Slack webhook ile de bildirim gönder
    curl -s -X POST -H 'Content-type: application/json' 
        --data "{"text":"DLQ alarm: $DLQ_COUNT mesaj işlenemedi"}" 
        https://hooks.slack.com/services/TOKEN
fi
EOF

chmod +x /opt/scripts/dlq_monitor.sh
echo "*/15 * * * * root /opt/scripts/dlq_monitor.sh" >> /etc/cron.d/dlq-monitor

Competing Consumers (Rekabet Eden Tüketiciler) Deseni

Bir kuyruğa birden fazla consumer bağlanarak işlem hızını artırırsınız. RabbitMQ’da bu varsayılan davranıştır ama doğru yapılandırılması önemlidir.

# Prefetch count ayarı kritik öneme sahip
# Consumer, ack göndermeden kaç mesaj alabilir?
# Düşük değer: Consumer boşta bekler
# Yüksek değer: Bir consumer yığılırken diğeri boşta kalır

cat << 'EOF' > /opt/queue_demo/consumer_config.py
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# Her consumer aynı anda sadece 1 mesaj işlesin
# İşi bitirmeden yeni mesaj almaz -> adil dağıtım
channel.basic_qos(prefetch_count=1)

def callback(ch, method, properties, body):
    import json
    import time
    
    siparis = json.loads(body)
    print(f"İşleniyor: {siparis['id']}")
    
    # İşlem yap
    time.sleep(1)  # Simüle edilen işlem
    
    # Başarılı: mesajı onayla
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(
    queue='siparis_kuyrugu',
    on_message_callback=callback
)

print("Consumer başladı, Ctrl+C ile dur")
channel.start_consuming()
EOF

Bu desende dikkat edilecek husus: Consumer’larınızı idempotent yapın. Yani aynı mesaj iki kez işlenirse sonuç değişmemeli. Ağ kesintisinde mesaj tekrar teslim edilebilir.

Saga Deseni ve Choreography

Mikroservislerde dağıtık transaction yönetimi için Saga deseni kullanılır. İki yaklaşım var: Orchestration (merkezi koordinatör) ve Choreography (her servis kendi koreografisini bilir).

Choreography tabanlı Saga’da her servis başarı veya hata eventi yayınlar:

# Kafka ile Saga choreography topic'leri
kafka-topics --create 
  --topic siparis-olusturuldu 
  --bootstrap-server localhost:9092 
  --partitions 3 
  --replication-factor 1

kafka-topics --create 
  --topic stok-rezerve-edildi 
  --bootstrap-server localhost:9092 
  --partitions 3 
  --replication-factor 1

kafka-topics --create 
  --topic odeme-tamamlandi 
  --bootstrap-server localhost:9092 
  --partitions 3 
  --replication-factor 1

kafka-topics --create 
  --topic siparis-iptal-edildi 
  --bootstrap-server localhost:9092 
  --partitions 3 
  --replication-factor 1

# Topic listesini doğrula
kafka-topics --list --bootstrap-server localhost:9092

Saga’nın akışı şöyle işler:

  • Sipariş Servisi: Sipariş oluşturur, siparis-olusturuldu yayınlar
  • Stok Servisi: siparis-olusturuldu dinler, stoğu reserve eder, stok-rezerve-edildi yayınlar. Stok yoksa stok-yetersiz yayınlar
  • Ödeme Servisi: stok-rezerve-edildi dinler, ödemi alır, odeme-tamamlandi yayınlar. Ödeme başarısız olursa odeme-basarisiz yayınlar
  • Stok Servisi: odeme-basarisiz dinler, rezervasyonu geri alır

Bu yaklaşımın güzelliği: Hiçbir servis diğerini doğrudan çağırmaz. Dezavantajı: Akışı takip etmek zorlaşır, distributed tracing şart olur.

Circuit Breaker ile Entegrasyon

Mesaj kuyruğu sistemlerinde consumer’ların dış servislere bağlandığı durumlarda circuit breaker pattern’ı kullanmak hayat kurtarır.

# Consumer'ın dış API çağrısı yapması ve circuit breaker
# pybreaker kütüphanesi ile örnek
pip install pybreaker pika

cat << 'EOF' > /opt/queue_demo/circuit_breaker_consumer.py
import pika
import pybreaker
import requests
import json

# Circuit breaker: 5 hata olursa 60 saniye açık kal
kargo_breaker = pybreaker.CircuitBreaker(
    fail_max=5,
    reset_timeout=60
)

@kargo_breaker
def kargo_api_cagir(siparis_id):
    response = requests.post(
        'https://kargo-api.sirket.com/siparis',
        json={'siparis_id': siparis_id},
        timeout=3
    )
    response.raise_for_status()
    return response.json()

def mesaj_isle(ch, method, properties, body):
    siparis = json.loads(body)
    
    try:
        sonuc = kargo_api_cagir(siparis['id'])
        print(f"Kargo oluşturuldu: {sonuc}")
        ch.basic_ack(delivery_tag=method.delivery_tag)
        
    except pybreaker.CircuitBreakerError:
        # Devre açık, mesajı geri koyma, DLQ'ya yönlendir
        print("Kargo API devresi açık, mesaj DLQ'ya gidiyor")
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
        
    except Exception as e:
        print(f"Hata: {e}, mesaj tekrar kuyruğa alınıyor")
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
EOF

Partition ve Ordering Garantileri

Kafka kullanıyorsanız sıralama garantisi partition bazındadır. Aynı kullanıcıya ait tüm eventlerin sıralı işlenmesi gerekiyorsa, aynı partition key’i kullanmalısınız.

# Kafka'da partition key ile mesaj gönderme
# kullanici_id partition key olarak kullanılır
# Aynı kullanıcının tüm eventleri aynı partition'a düşer

kafka-console-producer 
  --broker-list localhost:9092 
  --topic kullanici-olaylari 
  --property "key.separator=:" 
  --property "parse.key=true" << 'EOF'
user_123:{"event":"giris","zaman":"2024-01-15T10:00:00"}
user_456:{"event":"giris","zaman":"2024-01-15T10:00:01"}
user_123:{"event":"satin_alma","zaman":"2024-01-15T10:05:00"}
user_123:{"event":"cikis","zaman":"2024-01-15T10:30:00"}
EOF

# Partition dağılımını kontrol et
kafka-consumer-groups 
  --bootstrap-server localhost:9092 
  --describe 
  --group analitik-servisi

Bu noktada çok yaygın bir hata görüyorum: Partition sayısını sonradan artırmak. Kafka’da partition sayısı artırılabilir ama azaltılamaz, ve artırdığınızda mevcut key-to-partition eşleşmesi bozulur. Baştan doğru partition sayısını belirleyin.

Backpressure Yönetimi

Consumer’lar üretici kadar hızlı işleyemediğinde kuyruk büyür. Backpressure mekanizması bu durumu kontrol altına alır.

# Kafka consumer lag monitoring
cat << 'EOF' > /opt/scripts/consumer_lag_check.sh
#!/bin/bash
# Consumer lag belirli eşiği geçerse uyar

LAG=$(kafka-consumer-groups 
  --bootstrap-server localhost:9092 
  --describe 
  --group siparis-isleme-grubu 2>/dev/null | 
  awk 'NR>1 && $6 ~ /^[0-9]+$/ {sum += $6} END {print sum}')

echo "Toplam consumer lag: $LAG mesaj"

if [ "$LAG" -gt 5000 ]; then
    echo "KRITIK: Consumer lag çok yüksek ($LAG), yeni consumer ekleyin!" | 
    mail -s "Kafka Lag Alarm" [email protected]
elif [ "$LAG" -gt 1000 ]; then
    echo "UYARI: Consumer lag yüksek ($LAG mesaj)"
fi

# Metric'i Prometheus'a push et
curl -s --data-binary "kafka_consumer_lag{group="siparis-isleme"} $LAG" 
  http://prometheus-pushgateway:9091/metrics/job/kafka_monitor
EOF

chmod +x /opt/scripts/consumer_lag_check.sh
echo "*/5 * * * * root /opt/scripts/consumer_lag_check.sh" >> /etc/cron.d/kafka-monitor

Backpressure için pratik önlemler:

  • Consumer sayısını artırın: Kafka’da partition sayısına kadar consumer ekleyebilirsiniz
  • Batch processing: Tek tek değil, toplu işleyin
  • Rate limiting: Upstream’e “yavaşla” sinyali gönderin
  • Priority queue: Kritik mesajları ayrı kuyruğa alın

Sonuç

Mesaj kuyruğu mimarisi tasarlarken “hangi deseni kullanayım” sorusundan önce “ne garantisi istiyorum” sorusunu sormalısınız.

Birden fazla servise aynı mesajı iletmeniz mi gerekiyor? Pub/Sub. Tek bir işçi işlemesi mi? Point-to-point. Yanıt beklemeniz mi gerekiyor ama senkron servis kullanamıyor musunuz? Request-reply. Dağıtık transaction mı yönetiyorsunuz? Saga.

Bu desenleri birleştirmek de mümkün ve çoğu zaman gerekli. Örneğin sipariş sisteminde hem Saga hem DLQ hem de competing consumers birlikte kullanıyorsunuz.

Son olarak şunu söyleyeyim: En iyi mesaj kuyruğu mimarisi, ekibinizin anlayabildiği, monitor edebildiği ve sorun çıktığında müdahale edebildiği mimaridir. Her şeyi Kafka ile yapmak zorunda değilsiniz. Bazı senaryolar için Redis Streams yeterlidir, bazıları için BullMQ bile yeterlidir. Araç değil, ihtiyaç odaklı düşünün.

Gece 3’te pagerduty çaldığında ne yapacağınızı bilmiyorsanız, o mimari henüz hazır değildir.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir