RabbitMQ Dead Letter Queue ile Hata Yönetimi

Üretim ortamında bir mesaj işlenemediğinde ne olur? Eğer doğru bir mekanizma kurulmamışsa, o mesaj ya kaybolur ya da sistemi sonsuza kadar meşgul eder. İkisi de felaket. RabbitMQ’nun Dead Letter Queue (DLQ) mekanizması tam da bu sorunu çözmek için var, ama doğru yapılandırılmadığında “kuruldu, tamam” demekten öteye geçemiyor. Bu yazıda DLQ’yu gerçekten işe yarar hale getirmenin yollarını, üretimde karşılaştığım senaryolarla birlikte anlatacağım.

Dead Letter Queue Nedir ve Neden Önemlidir

RabbitMQ’da bir mesaj üç durumda “dead letter” haline gelir:

  • Reddedilme (rejected/nacked): Consumer mesajı basic.reject veya basic.nack ile reddeder ve requeue=false parametresi kullanılır
  • TTL aşımı: Mesajın yaşam süresi (Time-To-Live) dolmuştur
  • Kuyruk dolmuştur: Kuyruk kapasitesi (x-max-length) aşılmıştır

Bu üç durum gerçek hayatta sürekli yaşanır. Bir e-ticaret sisteminde sipariş işleme servisi çöktüğünde, yüzlerce sipariş mesajı kuyrukta bekler. Servis yeniden ayağa kalktığında bu mesajları nasıl işleyeceksiniz? Hangisi gerçekten bozuk, hangisi geçici bir ağ hatasından dolayı başarısız oldu? DLQ olmadan bu soruların cevabını bulmak, karanlıkta iğne aramaktan farklı değil.

Temel Yapılandırma: Exchange ve Queue İlişkisi

DLQ yapısı iki bileşenden oluşur: bir dead letter exchange (DLX) ve bu exchange’e bağlı bir veya daha fazla dead letter kuyruk. Ana kuyruğu oluştururken DLX’i argüman olarak belirtiyoruz.

# RabbitMQ Management API ile exchange oluşturma
curl -u guest:guest -X PUT 
  http://localhost:15672/api/exchanges/%2F/orders.dlx 
  -H "Content-Type: application/json" 
  -d '{"type":"direct","durable":true}'
# Dead letter kuyruğunu oluşturma
curl -u guest:guest -X PUT 
  http://localhost:15672/api/queues/%2F/orders.dead 
  -H "Content-Type: application/json" 
  -d '{
    "durable": true,
    "arguments": {}
  }'
# DLX ile ana kuyruğu oluşturma
curl -u guest:guest -X PUT 
  http://localhost:15672/api/queues/%2F/orders.processing 
  -H "Content-Type: application/json" 
  -d '{
    "durable": true,
    "arguments": {
      "x-dead-letter-exchange": "orders.dlx",
      "x-dead-letter-routing-key": "orders.dead",
      "x-message-ttl": 300000,
      "x-max-length": 10000
    }
  }'

Burada x-message-ttl değerini 300000 milisaniye (5 dakika) olarak ayarladım. Bu değer sisteminizin doğasına göre değişmeli. Gerçek zamanlı ödeme sistemlerinde bu süreyi çok daha kısa tutuyoruz, 30-60 saniye gibi. Raporlama sistemlerinde ise saatlerle ifade edilebilir.

# Binding oluşturma: DLX'i dead letter kuyruğuna bağlama
curl -u guest:guest -X POST 
  http://localhost:15672/api/bindings/%2F/e/orders.dlx/q/orders.dead 
  -H "Content-Type: application/json" 
  -d '{"routing_key":"orders.dead","arguments":{}}'

Python ile Consumer ve DLQ İmplementasyonu

Şimdi bu yapıyı gerçek bir uygulama üzerinde gösterelim. Aşağıdaki örnek, sipariş işleme senaryosunu simüle ediyor.

import pika
import json
import logging
import time
from datetime import datetime

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def get_connection():
    credentials = pika.PlainCredentials('guest', 'guest')
    parameters = pika.ConnectionParameters(
        host='localhost',
        port=5672,
        virtual_host='/',
        credentials=credentials,
        heartbeat=600,
        blocked_connection_timeout=300
    )
    return pika.BlockingConnection(parameters)

def setup_infrastructure(channel):
    # Dead letter exchange
    channel.exchange_declare(
        exchange='orders.dlx',
        exchange_type='direct',
        durable=True
    )
    
    # Dead letter kuyrugu
    channel.queue_declare(
        queue='orders.dead',
        durable=True
    )
    
    # DLX ile ana kuyruk
    channel.queue_declare(
        queue='orders.processing',
        durable=True,
        arguments={
            'x-dead-letter-exchange': 'orders.dlx',
            'x-dead-letter-routing-key': 'orders.dead',
            'x-message-ttl': 300000,
            'x-max-length': 10000
        }
    )
    
    # Binding
    channel.queue_bind(
        exchange='orders.dlx',
        queue='orders.dead',
        routing_key='orders.dead'
    )
    
    logger.info("Altyapi basariyla olusturuldu")

def process_order(order_data):
    """
    Gercek siparis isleme mantigi burada.
    Hata durumlarini simule ediyoruz.
    """
    order_id = order_data.get('order_id')
    amount = order_data.get('amount', 0)
    
    # Belirli siparisler icin hata simule et
    if amount < 0:
        raise ValueError(f"Gecersiz siparis tutari: {amount}")
    
    if order_data.get('simulate_timeout'):
        time.sleep(10)
    
    logger.info(f"Siparis islendi: {order_id}, Tutar: {amount}")
    return True

def callback(channel, method, properties, body):
    try:
        order_data = json.loads(body.decode('utf-8'))
        logger.info(f"Mesaj alindi: {order_data.get('order_id')}")
        
        # Mesajin kac kez denendigini kontrol et
        retry_count = 0
        if properties.headers:
            death_header = properties.headers.get('x-death', [])
            if death_header:
                retry_count = death_header[0].get('count', 0)
        
        if retry_count >= 3:
            logger.error(
                f"Maksimum deneme sayisina ulasildi: {order_data.get('order_id')}"
            )
            channel.basic_reject(
                delivery_tag=method.delivery_tag,
                requeue=False
            )
            return
        
        process_order(order_data)
        channel.basic_ack(delivery_tag=method.delivery_tag)
        
    except ValueError as e:
        logger.error(f"Kalici hata, DLQ'ya gonderiliyor: {e}")
        channel.basic_reject(
            delivery_tag=method.delivery_tag,
            requeue=False
        )
    except Exception as e:
        logger.warning(f"Gecici hata, yeniden kuyruga alinacak: {e}")
        channel.basic_nack(
            delivery_tag=method.delivery_tag,
            requeue=False
        )

def main():
    connection = get_connection()
    channel = connection.channel()
    
    setup_infrastructure(channel)
    
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(
        queue='orders.processing',
        on_message_callback=callback
    )
    
    logger.info("Consumer baslatildi, mesajlar bekleniyor...")
    
    try:
        channel.start_consuming()
    except KeyboardInterrupt:
        channel.stop_consuming()
    finally:
        connection.close()

if __name__ == '__main__':
    main()

Bu kodda dikkat edilmesi gereken önemli bir nokta var: x-death header’ı. RabbitMQ bir mesaj dead letter’a düştüğünde bu header’ı otomatik olarak ekliyor. İçinde mesajın kaç kez “öldüğü”, hangi kuyruktan geldiği ve sebebi gibi bilgiler yer alıyor. Bu bilgiyi okuyarak akıllı retry mekanizmaları kurabilirsiniz.

Delayed Retry Mekanizması

Basit reject/nack yerine daha sofistike bir yaklaşım olan “delayed retry” (gecikmeli yeniden deneme) çok daha sağlıklı sonuçlar veriyor. Bunun için rabbitmq-delayed-message-exchange plugin’ini kullanacağız.

# Plugin'i etkinlestir
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

# Delayed exchange olustur
curl -u guest:guest -X PUT 
  http://localhost:15672/api/exchanges/%2F/orders.retry 
  -H "Content-Type: application/json" 
  -d '{
    "type":"x-delayed-message",
    "durable":true,
    "arguments":{
      "x-delayed-type":"direct"
    }
  }'
def publish_with_retry_delay(channel, order_data, retry_count):
    """
    Exponential backoff ile yeniden deneme:
    1. deneme: 30 saniye
    2. deneme: 60 saniye  
    3. deneme: 120 saniye
    """
    delay_seconds = 30 * (2 ** retry_count)
    delay_ms = delay_seconds * 1000
    
    # Maksimum gecikmeyi 10 dakika ile sinirla
    delay_ms = min(delay_ms, 600000)
    
    order_data['retry_count'] = retry_count + 1
    order_data['last_retry_at'] = datetime.now().isoformat()
    
    channel.basic_publish(
        exchange='orders.retry',
        routing_key='orders.processing',
        body=json.dumps(order_data),
        properties=pika.BasicProperties(
            delivery_mode=2,  # Mesaji kalici yap
            headers={
                'x-delay': delay_ms,
                'x-retry-count': retry_count + 1
            }
        )
    )
    
    logger.info(
        f"Mesaj {delay_seconds} saniye sonra yeniden denenecek: "
        f"{order_data.get('order_id')}"
    )

Exponential backoff burada kritik. Bir servis geçici olarak çöktüğünde, tüm mesajları aynı anda tekrar göndermek “thundering herd” problemine yol açar. 30-60-120 saniye gibi artan aralıklarla yeniden deneme, sisteme nefes alma fırsatı verir.

DLQ Monitoring ve Alerting

DLQ’yu kurmak yeterli değil. Üretimde ne zaman mesajların dead letter’a düştüğünü anlık bilmek gerekiyor.

#!/bin/bash
# dlq_monitor.sh - DLQ mesaj sayisini kontrol eder ve uyari gonderir

RABBIT_HOST="localhost"
RABBIT_PORT="15672"
RABBIT_USER="admin"
RABBIT_PASS="secretpassword"
QUEUE_NAME="orders.dead"
THRESHOLD=10
ALERT_WEBHOOK="https://hooks.slack.com/services/YOUR/WEBHOOK/URL"

get_queue_depth() {
    curl -s -u "${RABBIT_USER}:${RABBIT_PASS}" 
        "http://${RABBIT_HOST}:${RABBIT_PORT}/api/queues/%2F/${QUEUE_NAME}" 
        | python3 -c "import sys,json; data=json.load(sys.stdin); print(data.get('messages', 0))"
}

send_slack_alert() {
    local message=$1
    curl -s -X POST 
        -H 'Content-type: application/json' 
        --data "{"text":"${message}"}" 
        "${ALERT_WEBHOOK}"
}

CURRENT_DEPTH=$(get_queue_depth)

if [ "${CURRENT_DEPTH}" -gt "${THRESHOLD}" ]; then
    ALERT_MSG="UYARI: DLQ '${QUEUE_NAME}' kuyruğunda ${CURRENT_DEPTH} mesaj birikti! Threshold: ${THRESHOLD}"
    logger -t dlq_monitor "${ALERT_MSG}"
    send_slack_alert "${ALERT_MSG}"
    echo "$(date): ${ALERT_MSG}" >> /var/log/rabbitmq/dlq_alerts.log
fi

echo "$(date): DLQ derinligi: ${CURRENT_DEPTH}" >> /var/log/rabbitmq/dlq_depth.log
# Cron ile her 5 dakikada bir calistir
*/5 * * * * /usr/local/bin/dlq_monitor.sh

Bu script’i Prometheus + Grafana ile de entegre edebilirsiniz. RabbitMQ’nun rabbitmq_prometheus plugin’i sayesinde tüm metrikler otomatik olarak Prometheus’a akar, oradan Grafana alert’leri kurabilirsiniz. Ancak basit bir monitoring için bu bash yaklaşımı hala işlevselliğini koruyor.

DLQ’daki Mesajları İşleme ve Requeue

Dead letter kuyruğuna düşmüş mesajları analiz edip yeniden işlemeye sokmak için bir yönetim aracına ihtiyacınız var.

#!/usr/bin/env python3
"""
dlq_replay.py - DLQ'daki mesajlari analiz eder ve
secici olarak ana kuyruğa geri gonderir
"""

import pika
import json
import argparse
from datetime import datetime

def connect():
    credentials = pika.PlainCredentials('admin', 'secretpassword')
    params = pika.ConnectionParameters(
        host='localhost',
        credentials=credentials
    )
    return pika.BlockingConnection(params)

def analyze_dlq(max_messages=100):
    """DLQ'daki mesajlari analiz eder, silmez"""
    connection = connect()
    channel = connection.channel()
    
    messages = []
    error_summary = {}
    
    for _ in range(max_messages):
        method, properties, body = channel.basic_get(
            queue='orders.dead',
            auto_ack=False
        )
        
        if method is None:
            break
        
        try:
            data = json.loads(body.decode('utf-8'))
            
            # x-death header'indan hata nedenini cikar
            death_info = {}
            if properties.headers and 'x-death' in properties.headers:
                death_entry = properties.headers['x-death'][0]
                death_info = {
                    'reason': death_entry.get('reason', 'unknown'),
                    'queue': death_entry.get('queue', 'unknown'),
                    'count': death_entry.get('count', 0),
                    'time': str(death_entry.get('time', ''))
                }
                
                reason = death_info['reason']
                error_summary[reason] = error_summary.get(reason, 0) + 1
            
            messages.append({
                'data': data,
                'death_info': death_info,
                'delivery_tag': method.delivery_tag
            })
            
        except json.JSONDecodeError as e:
            print(f"JSON parse hatasi: {e}")
    
    # Mesajlari nack ile geri koy (silme)
    for msg in messages:
        channel.basic_nack(
            delivery_tag=msg['delivery_tag'],
            requeue=True
        )
    
    connection.close()
    
    print("n--- DLQ Analiz Raporu ---")
    print(f"Toplam incelenen mesaj: {len(messages)}")
    print("nHata dagılımı:")
    for reason, count in error_summary.items():
        print(f"  {reason}: {count} mesaj")
    
    return messages

def replay_messages(filter_func=None, dry_run=True):
    """
    DLQ'daki mesajlari ana kuyruğa geri gonder.
    filter_func: True dondurenler replay edilir
    dry_run: True ise gercekten gondermez, sadece rapor eder
    """
    connection = connect()
    channel = connection.channel()
    
    replayed = 0
    skipped = 0
    
    while True:
        method, properties, body = channel.basic_get(
            queue='orders.dead',
            auto_ack=False
        )
        
        if method is None:
            break
        
        data = json.loads(body.decode('utf-8'))
        
        should_replay = True
        if filter_func:
            should_replay = filter_func(data)
        
        if should_replay:
            if not dry_run:
                channel.basic_publish(
                    exchange='',
                    routing_key='orders.processing',
                    body=body,
                    properties=pika.BasicProperties(
                        delivery_mode=2,
                        headers={'x-replayed-at': datetime.now().isoformat()}
                    )
                )
                channel.basic_ack(delivery_tag=method.delivery_tag)
            else:
                print(f"[DRY RUN] Replay edilecek: {data.get('order_id')}")
                channel.basic_nack(
                    delivery_tag=method.delivery_tag,
                    requeue=True
                )
            replayed += 1
        else:
            channel.basic_nack(
                delivery_tag=method.delivery_tag,
                requeue=True
            )
            skipped += 1
    
    connection.close()
    print(f"nSonuc: {replayed} replay, {skipped} atlandı")

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--action', choices=['analyze', 'replay'], required=True)
    parser.add_argument('--dry-run', action='store_true', default=True)
    args = parser.parse_args()
    
    if args.action == 'analyze':
        analyze_dlq()
    elif args.action == 'replay':
        replay_messages(dry_run=args.dry_run)

dry_run parametresi bu araçta çok önemli. Üretimde bir şeyleri geri göndermeden önce ne gönderdiğinizi kesinlikle bilmeniz gerekiyor. Bir keresinde dry_run olmadan doğrudan replay çalıştıran bir mühendis, sistemdeki bozuk bir data migration’ın ürettiği 50.000 mesajı ana kuyruğa geri gönderdi. Sonuç? İki saatlik downtime.

Gerçek Dünya Senaryosu: E-ticaret Ödeme Akışı

Somut bir örnek üzerinden DLQ tasarımını ele alalım. Bir ödeme işleme sisteminde şu kuyruklara ihtiyaç duyuyoruz:

  • payment.incoming: Yeni ödeme talepleri
  • payment.validation: Kart doğrulama
  • payment.processing: Gerçek ödeme işlemi
  • payment.dead: Tüm aşamalardaki başarısız mesajlar

Her kuyruğun farklı TTL ve max-length değerlerine ihtiyacı var:

  • payment.incoming için TTL: 60 saniye (ödeme talepleri bayat olmamalı)
  • payment.validation için TTL: 30 saniye (doğrulama hızlı olmalı)
  • payment.processing için TTL: 120 saniye (banka API’si yavaş olabilir)

DLQ’ya düşen bir ödeme mesajının x-death header’ında hangi aşamada başarısız olduğu, kaç kez denendiği ve zaman damgası yer alıyor. Bu bilgiyle hem müşteriye doğru bir hata mesajı gösterebilir hem de hangi servisin sıkıntılı olduğunu anlayabilirsiniz.

Ödeme sistemlerinde önemli bir kural: aynı ödeme iki kez işlenmesin. Mesaj ID’sini idempotency key olarak kullanıp Redis’te saklayarak duplicate payment sorununu çözebilirsiniz. DLQ replay yaparken bu kontrol hayati önem taşır.

Üretimde Dikkat Edilmesi Gereken Noktalar

Birkaç yıldır RabbitMQ DLQ’yu farklı ölçeklerde kullanan sistemlerde gördüğüm en yaygın hatalar:

  • DLQ’nun sınırsız büyümesine izin vermek: DLQ’ya da x-max-length koyun. Aksi halde disk dolduğunda tüm RabbitMQ node’u çökebilir. DLQ için genellikle ana kuyruk kapasitesinin iki katını kullanıyorum.
  • Her hatayı DLQ’ya göndermek: Geçici ağ hataları için DLQ yerine kısa süreli requeue daha mantıklı. Sadece kalıcı ve işlenemeyen mesajlar DLQ’ya gitmelidir.
  • DLQ’yu temizlememek: DLQ bir “arşiv” değil, aksiyon beklenen bir kuyruktur. Düzenli olarak incelenip temizlenmesi gerekir.
  • Consumer sayısına dikkat etmek: DLQ’nun consumer’ı olmadığında mesajlar birikir. Bu zaten amacı, ama DLQ consumer’ı da yazılmalı ve hataları bir log/ticket sistemine iletmelidir.
  • Message property’lerini kaybetmemek: Replay sırasında orijinal mesajın header’larını ve correlation ID’sini koruyun. Aksi halde izleme ve debugging kabusa döner.

Sonuç

Dead Letter Queue, RabbitMQ’nun en değerli özelliklerinden biri ama aynı zamanda en çok yanlış anlaşılanı. “Kuyruk var, hata olan mesajlar oraya gider, tamam” yaklaşımı yetersiz. DLQ, aslında bir hata yönetim stratejisinin parçası olmalı.

Doğru kurulmuş bir DLQ yapısı size şunları sağlar: hiçbir mesajı kaybetmezsiniz, hangi mesajların neden başarısız olduğunu analiz edebilirsiniz, gerektiğinde seçici olarak replay yapabilirsiniz ve sistemin sağlığı hakkında erken uyarı alabilirsiniz.

Bu yazıda anlattığım her kod parçası ve yapılandırma kararı gerçek sistemlerde test edilmiş şeyler. Ölçeğinize ve ihtiyaçlarınıza göre uyarlamanız gerekebilir, ama temel prensipler değişmez: mesajı kaybet, ya hakkında bir şey yap ya da bile bile görmezden gel. İkincisi bir tercih değil, bir kaza olmalı.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir