RabbitMQ Dead Letter Queue ile Hata Yönetimi
Üretim ortamında bir mesaj işlenemediğinde ne olur? Eğer doğru bir mekanizma kurulmamışsa, o mesaj ya kaybolur ya da sistemi sonsuza kadar meşgul eder. İkisi de felaket. RabbitMQ’nun Dead Letter Queue (DLQ) mekanizması tam da bu sorunu çözmek için var, ama doğru yapılandırılmadığında “kuruldu, tamam” demekten öteye geçemiyor. Bu yazıda DLQ’yu gerçekten işe yarar hale getirmenin yollarını, üretimde karşılaştığım senaryolarla birlikte anlatacağım.
Dead Letter Queue Nedir ve Neden Önemlidir
RabbitMQ’da bir mesaj üç durumda “dead letter” haline gelir:
- Reddedilme (rejected/nacked): Consumer mesajı
basic.rejectveyabasic.nackile reddeder verequeue=falseparametresi kullanılır - TTL aşımı: Mesajın yaşam süresi (Time-To-Live) dolmuştur
- Kuyruk dolmuştur: Kuyruk kapasitesi (
x-max-length) aşılmıştır
Bu üç durum gerçek hayatta sürekli yaşanır. Bir e-ticaret sisteminde sipariş işleme servisi çöktüğünde, yüzlerce sipariş mesajı kuyrukta bekler. Servis yeniden ayağa kalktığında bu mesajları nasıl işleyeceksiniz? Hangisi gerçekten bozuk, hangisi geçici bir ağ hatasından dolayı başarısız oldu? DLQ olmadan bu soruların cevabını bulmak, karanlıkta iğne aramaktan farklı değil.
Temel Yapılandırma: Exchange ve Queue İlişkisi
DLQ yapısı iki bileşenden oluşur: bir dead letter exchange (DLX) ve bu exchange’e bağlı bir veya daha fazla dead letter kuyruk. Ana kuyruğu oluştururken DLX’i argüman olarak belirtiyoruz.
# RabbitMQ Management API ile exchange oluşturma
curl -u guest:guest -X PUT
http://localhost:15672/api/exchanges/%2F/orders.dlx
-H "Content-Type: application/json"
-d '{"type":"direct","durable":true}'
# Dead letter kuyruğunu oluşturma
curl -u guest:guest -X PUT
http://localhost:15672/api/queues/%2F/orders.dead
-H "Content-Type: application/json"
-d '{
"durable": true,
"arguments": {}
}'
# DLX ile ana kuyruğu oluşturma
curl -u guest:guest -X PUT
http://localhost:15672/api/queues/%2F/orders.processing
-H "Content-Type: application/json"
-d '{
"durable": true,
"arguments": {
"x-dead-letter-exchange": "orders.dlx",
"x-dead-letter-routing-key": "orders.dead",
"x-message-ttl": 300000,
"x-max-length": 10000
}
}'
Burada x-message-ttl değerini 300000 milisaniye (5 dakika) olarak ayarladım. Bu değer sisteminizin doğasına göre değişmeli. Gerçek zamanlı ödeme sistemlerinde bu süreyi çok daha kısa tutuyoruz, 30-60 saniye gibi. Raporlama sistemlerinde ise saatlerle ifade edilebilir.
# Binding oluşturma: DLX'i dead letter kuyruğuna bağlama
curl -u guest:guest -X POST
http://localhost:15672/api/bindings/%2F/e/orders.dlx/q/orders.dead
-H "Content-Type: application/json"
-d '{"routing_key":"orders.dead","arguments":{}}'
Python ile Consumer ve DLQ İmplementasyonu
Şimdi bu yapıyı gerçek bir uygulama üzerinde gösterelim. Aşağıdaki örnek, sipariş işleme senaryosunu simüle ediyor.
import pika
import json
import logging
import time
from datetime import datetime
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def get_connection():
credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters(
host='localhost',
port=5672,
virtual_host='/',
credentials=credentials,
heartbeat=600,
blocked_connection_timeout=300
)
return pika.BlockingConnection(parameters)
def setup_infrastructure(channel):
# Dead letter exchange
channel.exchange_declare(
exchange='orders.dlx',
exchange_type='direct',
durable=True
)
# Dead letter kuyrugu
channel.queue_declare(
queue='orders.dead',
durable=True
)
# DLX ile ana kuyruk
channel.queue_declare(
queue='orders.processing',
durable=True,
arguments={
'x-dead-letter-exchange': 'orders.dlx',
'x-dead-letter-routing-key': 'orders.dead',
'x-message-ttl': 300000,
'x-max-length': 10000
}
)
# Binding
channel.queue_bind(
exchange='orders.dlx',
queue='orders.dead',
routing_key='orders.dead'
)
logger.info("Altyapi basariyla olusturuldu")
def process_order(order_data):
"""
Gercek siparis isleme mantigi burada.
Hata durumlarini simule ediyoruz.
"""
order_id = order_data.get('order_id')
amount = order_data.get('amount', 0)
# Belirli siparisler icin hata simule et
if amount < 0:
raise ValueError(f"Gecersiz siparis tutari: {amount}")
if order_data.get('simulate_timeout'):
time.sleep(10)
logger.info(f"Siparis islendi: {order_id}, Tutar: {amount}")
return True
def callback(channel, method, properties, body):
try:
order_data = json.loads(body.decode('utf-8'))
logger.info(f"Mesaj alindi: {order_data.get('order_id')}")
# Mesajin kac kez denendigini kontrol et
retry_count = 0
if properties.headers:
death_header = properties.headers.get('x-death', [])
if death_header:
retry_count = death_header[0].get('count', 0)
if retry_count >= 3:
logger.error(
f"Maksimum deneme sayisina ulasildi: {order_data.get('order_id')}"
)
channel.basic_reject(
delivery_tag=method.delivery_tag,
requeue=False
)
return
process_order(order_data)
channel.basic_ack(delivery_tag=method.delivery_tag)
except ValueError as e:
logger.error(f"Kalici hata, DLQ'ya gonderiliyor: {e}")
channel.basic_reject(
delivery_tag=method.delivery_tag,
requeue=False
)
except Exception as e:
logger.warning(f"Gecici hata, yeniden kuyruga alinacak: {e}")
channel.basic_nack(
delivery_tag=method.delivery_tag,
requeue=False
)
def main():
connection = get_connection()
channel = connection.channel()
setup_infrastructure(channel)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue='orders.processing',
on_message_callback=callback
)
logger.info("Consumer baslatildi, mesajlar bekleniyor...")
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
finally:
connection.close()
if __name__ == '__main__':
main()
Bu kodda dikkat edilmesi gereken önemli bir nokta var: x-death header’ı. RabbitMQ bir mesaj dead letter’a düştüğünde bu header’ı otomatik olarak ekliyor. İçinde mesajın kaç kez “öldüğü”, hangi kuyruktan geldiği ve sebebi gibi bilgiler yer alıyor. Bu bilgiyi okuyarak akıllı retry mekanizmaları kurabilirsiniz.
Delayed Retry Mekanizması
Basit reject/nack yerine daha sofistike bir yaklaşım olan “delayed retry” (gecikmeli yeniden deneme) çok daha sağlıklı sonuçlar veriyor. Bunun için rabbitmq-delayed-message-exchange plugin’ini kullanacağız.
# Plugin'i etkinlestir
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# Delayed exchange olustur
curl -u guest:guest -X PUT
http://localhost:15672/api/exchanges/%2F/orders.retry
-H "Content-Type: application/json"
-d '{
"type":"x-delayed-message",
"durable":true,
"arguments":{
"x-delayed-type":"direct"
}
}'
def publish_with_retry_delay(channel, order_data, retry_count):
"""
Exponential backoff ile yeniden deneme:
1. deneme: 30 saniye
2. deneme: 60 saniye
3. deneme: 120 saniye
"""
delay_seconds = 30 * (2 ** retry_count)
delay_ms = delay_seconds * 1000
# Maksimum gecikmeyi 10 dakika ile sinirla
delay_ms = min(delay_ms, 600000)
order_data['retry_count'] = retry_count + 1
order_data['last_retry_at'] = datetime.now().isoformat()
channel.basic_publish(
exchange='orders.retry',
routing_key='orders.processing',
body=json.dumps(order_data),
properties=pika.BasicProperties(
delivery_mode=2, # Mesaji kalici yap
headers={
'x-delay': delay_ms,
'x-retry-count': retry_count + 1
}
)
)
logger.info(
f"Mesaj {delay_seconds} saniye sonra yeniden denenecek: "
f"{order_data.get('order_id')}"
)
Exponential backoff burada kritik. Bir servis geçici olarak çöktüğünde, tüm mesajları aynı anda tekrar göndermek “thundering herd” problemine yol açar. 30-60-120 saniye gibi artan aralıklarla yeniden deneme, sisteme nefes alma fırsatı verir.
DLQ Monitoring ve Alerting
DLQ’yu kurmak yeterli değil. Üretimde ne zaman mesajların dead letter’a düştüğünü anlık bilmek gerekiyor.
#!/bin/bash
# dlq_monitor.sh - DLQ mesaj sayisini kontrol eder ve uyari gonderir
RABBIT_HOST="localhost"
RABBIT_PORT="15672"
RABBIT_USER="admin"
RABBIT_PASS="secretpassword"
QUEUE_NAME="orders.dead"
THRESHOLD=10
ALERT_WEBHOOK="https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
get_queue_depth() {
curl -s -u "${RABBIT_USER}:${RABBIT_PASS}"
"http://${RABBIT_HOST}:${RABBIT_PORT}/api/queues/%2F/${QUEUE_NAME}"
| python3 -c "import sys,json; data=json.load(sys.stdin); print(data.get('messages', 0))"
}
send_slack_alert() {
local message=$1
curl -s -X POST
-H 'Content-type: application/json'
--data "{"text":"${message}"}"
"${ALERT_WEBHOOK}"
}
CURRENT_DEPTH=$(get_queue_depth)
if [ "${CURRENT_DEPTH}" -gt "${THRESHOLD}" ]; then
ALERT_MSG="UYARI: DLQ '${QUEUE_NAME}' kuyruğunda ${CURRENT_DEPTH} mesaj birikti! Threshold: ${THRESHOLD}"
logger -t dlq_monitor "${ALERT_MSG}"
send_slack_alert "${ALERT_MSG}"
echo "$(date): ${ALERT_MSG}" >> /var/log/rabbitmq/dlq_alerts.log
fi
echo "$(date): DLQ derinligi: ${CURRENT_DEPTH}" >> /var/log/rabbitmq/dlq_depth.log
# Cron ile her 5 dakikada bir calistir
*/5 * * * * /usr/local/bin/dlq_monitor.sh
Bu script’i Prometheus + Grafana ile de entegre edebilirsiniz. RabbitMQ’nun rabbitmq_prometheus plugin’i sayesinde tüm metrikler otomatik olarak Prometheus’a akar, oradan Grafana alert’leri kurabilirsiniz. Ancak basit bir monitoring için bu bash yaklaşımı hala işlevselliğini koruyor.
DLQ’daki Mesajları İşleme ve Requeue
Dead letter kuyruğuna düşmüş mesajları analiz edip yeniden işlemeye sokmak için bir yönetim aracına ihtiyacınız var.
#!/usr/bin/env python3
"""
dlq_replay.py - DLQ'daki mesajlari analiz eder ve
secici olarak ana kuyruğa geri gonderir
"""
import pika
import json
import argparse
from datetime import datetime
def connect():
credentials = pika.PlainCredentials('admin', 'secretpassword')
params = pika.ConnectionParameters(
host='localhost',
credentials=credentials
)
return pika.BlockingConnection(params)
def analyze_dlq(max_messages=100):
"""DLQ'daki mesajlari analiz eder, silmez"""
connection = connect()
channel = connection.channel()
messages = []
error_summary = {}
for _ in range(max_messages):
method, properties, body = channel.basic_get(
queue='orders.dead',
auto_ack=False
)
if method is None:
break
try:
data = json.loads(body.decode('utf-8'))
# x-death header'indan hata nedenini cikar
death_info = {}
if properties.headers and 'x-death' in properties.headers:
death_entry = properties.headers['x-death'][0]
death_info = {
'reason': death_entry.get('reason', 'unknown'),
'queue': death_entry.get('queue', 'unknown'),
'count': death_entry.get('count', 0),
'time': str(death_entry.get('time', ''))
}
reason = death_info['reason']
error_summary[reason] = error_summary.get(reason, 0) + 1
messages.append({
'data': data,
'death_info': death_info,
'delivery_tag': method.delivery_tag
})
except json.JSONDecodeError as e:
print(f"JSON parse hatasi: {e}")
# Mesajlari nack ile geri koy (silme)
for msg in messages:
channel.basic_nack(
delivery_tag=msg['delivery_tag'],
requeue=True
)
connection.close()
print("n--- DLQ Analiz Raporu ---")
print(f"Toplam incelenen mesaj: {len(messages)}")
print("nHata dagılımı:")
for reason, count in error_summary.items():
print(f" {reason}: {count} mesaj")
return messages
def replay_messages(filter_func=None, dry_run=True):
"""
DLQ'daki mesajlari ana kuyruğa geri gonder.
filter_func: True dondurenler replay edilir
dry_run: True ise gercekten gondermez, sadece rapor eder
"""
connection = connect()
channel = connection.channel()
replayed = 0
skipped = 0
while True:
method, properties, body = channel.basic_get(
queue='orders.dead',
auto_ack=False
)
if method is None:
break
data = json.loads(body.decode('utf-8'))
should_replay = True
if filter_func:
should_replay = filter_func(data)
if should_replay:
if not dry_run:
channel.basic_publish(
exchange='',
routing_key='orders.processing',
body=body,
properties=pika.BasicProperties(
delivery_mode=2,
headers={'x-replayed-at': datetime.now().isoformat()}
)
)
channel.basic_ack(delivery_tag=method.delivery_tag)
else:
print(f"[DRY RUN] Replay edilecek: {data.get('order_id')}")
channel.basic_nack(
delivery_tag=method.delivery_tag,
requeue=True
)
replayed += 1
else:
channel.basic_nack(
delivery_tag=method.delivery_tag,
requeue=True
)
skipped += 1
connection.close()
print(f"nSonuc: {replayed} replay, {skipped} atlandı")
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--action', choices=['analyze', 'replay'], required=True)
parser.add_argument('--dry-run', action='store_true', default=True)
args = parser.parse_args()
if args.action == 'analyze':
analyze_dlq()
elif args.action == 'replay':
replay_messages(dry_run=args.dry_run)
dry_run parametresi bu araçta çok önemli. Üretimde bir şeyleri geri göndermeden önce ne gönderdiğinizi kesinlikle bilmeniz gerekiyor. Bir keresinde dry_run olmadan doğrudan replay çalıştıran bir mühendis, sistemdeki bozuk bir data migration’ın ürettiği 50.000 mesajı ana kuyruğa geri gönderdi. Sonuç? İki saatlik downtime.
Gerçek Dünya Senaryosu: E-ticaret Ödeme Akışı
Somut bir örnek üzerinden DLQ tasarımını ele alalım. Bir ödeme işleme sisteminde şu kuyruklara ihtiyaç duyuyoruz:
- payment.incoming: Yeni ödeme talepleri
- payment.validation: Kart doğrulama
- payment.processing: Gerçek ödeme işlemi
- payment.dead: Tüm aşamalardaki başarısız mesajlar
Her kuyruğun farklı TTL ve max-length değerlerine ihtiyacı var:
- payment.incoming için TTL: 60 saniye (ödeme talepleri bayat olmamalı)
- payment.validation için TTL: 30 saniye (doğrulama hızlı olmalı)
- payment.processing için TTL: 120 saniye (banka API’si yavaş olabilir)
DLQ’ya düşen bir ödeme mesajının x-death header’ında hangi aşamada başarısız olduğu, kaç kez denendiği ve zaman damgası yer alıyor. Bu bilgiyle hem müşteriye doğru bir hata mesajı gösterebilir hem de hangi servisin sıkıntılı olduğunu anlayabilirsiniz.
Ödeme sistemlerinde önemli bir kural: aynı ödeme iki kez işlenmesin. Mesaj ID’sini idempotency key olarak kullanıp Redis’te saklayarak duplicate payment sorununu çözebilirsiniz. DLQ replay yaparken bu kontrol hayati önem taşır.
Üretimde Dikkat Edilmesi Gereken Noktalar
Birkaç yıldır RabbitMQ DLQ’yu farklı ölçeklerde kullanan sistemlerde gördüğüm en yaygın hatalar:
- DLQ’nun sınırsız büyümesine izin vermek: DLQ’ya da
x-max-lengthkoyun. Aksi halde disk dolduğunda tüm RabbitMQ node’u çökebilir. DLQ için genellikle ana kuyruk kapasitesinin iki katını kullanıyorum.
- Her hatayı DLQ’ya göndermek: Geçici ağ hataları için DLQ yerine kısa süreli requeue daha mantıklı. Sadece kalıcı ve işlenemeyen mesajlar DLQ’ya gitmelidir.
- DLQ’yu temizlememek: DLQ bir “arşiv” değil, aksiyon beklenen bir kuyruktur. Düzenli olarak incelenip temizlenmesi gerekir.
- Consumer sayısına dikkat etmek: DLQ’nun consumer’ı olmadığında mesajlar birikir. Bu zaten amacı, ama DLQ consumer’ı da yazılmalı ve hataları bir log/ticket sistemine iletmelidir.
- Message property’lerini kaybetmemek: Replay sırasında orijinal mesajın header’larını ve correlation ID’sini koruyun. Aksi halde izleme ve debugging kabusa döner.
Sonuç
Dead Letter Queue, RabbitMQ’nun en değerli özelliklerinden biri ama aynı zamanda en çok yanlış anlaşılanı. “Kuyruk var, hata olan mesajlar oraya gider, tamam” yaklaşımı yetersiz. DLQ, aslında bir hata yönetim stratejisinin parçası olmalı.
Doğru kurulmuş bir DLQ yapısı size şunları sağlar: hiçbir mesajı kaybetmezsiniz, hangi mesajların neden başarısız olduğunu analiz edebilirsiniz, gerektiğinde seçici olarak replay yapabilirsiniz ve sistemin sağlığı hakkında erken uyarı alabilirsiniz.
Bu yazıda anlattığım her kod parçası ve yapılandırma kararı gerçek sistemlerde test edilmiş şeyler. Ölçeğinize ve ihtiyaçlarınıza göre uyarlamanız gerekebilir, ama temel prensipler değişmez: mesajı kaybet, ya hakkında bir şey yap ya da bile bile görmezden gel. İkincisi bir tercih değil, bir kaza olmalı.
