Mesaj Kuyruğunda Idempotency ve At-Least-Once Delivery
Prodüksiyonda bir mesaj kuyruğu sistemi kurduğunuzda, ilk başta her şey güzel görünür. Mesajlar gönderiliyor, işleniyor, hayat güzel. Sonra bir gece üçte telefonunuz çalıyor: “Kullanıcılara çift ödeme gitti.” İşte o an at-least-once delivery ile idempotency’nin neden bu kadar kritik olduğunu anlıyorsunuz. Bu yazıda bu iki kavramı, aralarındaki ilişkiyi ve gerçek dünyada nasıl uyguladığımızı konuşacağız.
At-Least-Once Delivery Nedir ve Neden Varsayılan Budur
Mesaj kuyruğu sistemlerinin büyük çoğunluğu at-least-once delivery garantisi verir. RabbitMQ, Apache Kafka, AWS SQS, bunların hepsi bu kategoriye girer. Peki neden exactly-once değil de at-least-once?
Çünkü distributed sistemlerde exactly-once garantisi vermek inanılmaz derecede pahalıdır. Ağ bölünmesi, consumer crash’leri, network timeout’ları… bunların hepsinde “bu mesaj tam olarak bir kez işlendi” demek için koordinasyon maliyeti çok yüksektir. Kafka’nın idempotent producer özelliği bile tam anlamıyla exactly-once değil, producer tarafında deduplication sağlıyor.
At-least-once delivery şu anlama gelir: Bir mesajın işleneceğini garanti ederim, ama belki birden fazla kez işlenebilir. Consumer mesajı aldı, işledi, ama acknowledgment göndermeden çöktü mü? Broker mesajı tekrar kuyruğa koyar. Consumer yavaş çalıştığı için heartbeat timeout doldu mu? Yine aynı mesaj başka bir consumer’a gider.
# RabbitMQ'da bir mesajın requeue edilme durumunu izlemek için
rabbitmqctl list_queues name messages messages_ready messages_unacknowledged
# Unacknowledged mesajları görmek - bunlar potansiyel duplicate kaynağı
rabbitmqctl list_consumers
Şimdi asıl soruya gelelim: Eğer bir mesaj birden fazla kez işlenebiliyorsa, sisteminiz buna hazır mı?
Idempotency Kavramı: Matematiğin Mühendisliğe Katkısı
Idempotency matematikten gelir. Bir işlem idempotent’tir eğer aynı inputla defalarca çalıştırıldığında her seferinde aynı sonucu üretiyorsa. HTTP metodları açısından düşünün: GET idempotent’tir, DELETE idempotent’tir (kaynağı bulduysa sil, bulamazsan da 404 dön, sonuç aynı: kaynak yok), ama POST genellikle değildir.
Mesaj kuyruğu bağlamında idempotency şu demek: Aynı mesajı 5 kez işlesen de 1 kez işlesen de sistem aynı durumda kalmalı.
# Idempotent OLMAYAN handler - tehlikeli!
def process_payment_message(message):
user_id = message['user_id']
amount = message['amount']
# Bu kod çift çalışırsa kullanıcıdan iki kez para çekilir!
db.execute(
"INSERT INTO payments (user_id, amount, created_at) VALUES (?, ?, ?)",
(user_id, amount, datetime.now())
)
send_email(user_id, "Ödemeniz alındı")
# Idempotent handler - güvenli
def process_payment_message(message):
message_id = message['message_id'] # Unique identifier
user_id = message['user_id']
amount = message['amount']
# Önce bu mesajı daha önce işledik mi kontrol et
existing = db.execute(
"SELECT id FROM processed_messages WHERE message_id = ?",
(message_id,)
).fetchone()
if existing:
logger.info(f"Mesaj {message_id} zaten işlendi, atlıyorum")
return # Sessizce çık, hata değil bu
# Transaction içinde hem ödemi kaydet hem de mesajı işlenmiş olarak işaretle
with db.transaction():
db.execute(
"INSERT INTO payments (user_id, amount, message_id) VALUES (?, ?, ?)",
(user_id, amount, message_id)
)
db.execute(
"INSERT INTO processed_messages (message_id, processed_at) VALUES (?, ?)",
(message_id, datetime.now())
)
send_email(user_id, "Ödemeniz alındı")
Message ID Stratejisi: Kimin ID’sini Kullanacaksınız?
Bu noktada kritik bir soru çıkıyor: Mesaj ID’si nereden geliyor? Birkaç strateji var:
Broker’ın atadığı ID: RabbitMQ delivery tag, Kafka offset gibi. Bu seçenek tehlikelidir çünkü farklı consumer örnekleri farklı delivery tag alabilir ama aynı mesaj olabilir.
Producer’ın atadığı UUID: En sağlıklı yöntem. Mesajı oluşturan taraf UUID üretir ve bu ID mesajın ömrü boyunca değişmez.
Business key: İdeal senaryo. “Sipariş #12345 için ödeme işle” gibi. Bu durumda sipariş ID’si doğal idempotency key’i oluyor.
# Kafka'da mesaj key kullanımı - aynı key aynı partition'a gider
# Bu producer tarafında natural ordering sağlar
kafka-console-producer.sh
--broker-list localhost:9092
--topic payments
--property "parse.key=true"
--property "key.separator=:" << EOF
order-12345:{"order_id": "12345", "amount": 150.00, "currency": "TRY"}
order-12346:{"order_id": "12346", "amount": 275.50, "currency": "TRY"}
EOF
# Producer tarafında idempotency key oluşturma
import uuid
import json
from datetime import datetime
def create_payment_message(order_id, user_id, amount):
return {
"message_id": str(uuid.uuid4()), # Global unique ID
"idempotency_key": f"payment-{order_id}", # Business key
"order_id": order_id,
"user_id": user_id,
"amount": amount,
"currency": "TRY",
"created_at": datetime.utcnow().isoformat(),
"version": 1
}
# Mesajı gönderirken
message = create_payment_message(
order_id="ORD-2024-00123",
user_id="USR-456",
amount=299.90
)
# RabbitMQ'ya gönder
channel.basic_publish(
exchange='payments',
routing_key='payment.process',
body=json.dumps(message),
properties=pika.BasicProperties(
message_id=message['message_id'],
content_type='application/json',
delivery_mode=2 # Persistent
)
)
Deduplication Store: Redis ile Pratik Yaklaşım
Veritabanına her mesaj için sorgu atmak yük altında sorun yaratabilir. Redis burada devreye giriyor. TTL destekli bir deduplication store kurarsanız hem hızlı hem de otomatik temizlenen bir çözümünüz olur.
import redis
import json
from functools import wraps
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
def idempotent_handler(ttl_seconds=86400): # 24 saat default
def decorator(func):
@wraps(func)
def wrapper(message, *args, **kwargs):
message_id = message.get('message_id')
if not message_id:
raise ValueError("Mesaj ID'si yok, idempotency garantisi verilemiyor!")
dedup_key = f"processed:{func.__name__}:{message_id}"
# SET NX: Sadece key yoksa set et, atomik operasyon
was_set = redis_client.set(
dedup_key,
"1",
ex=ttl_seconds,
nx=True
)
if not was_set:
# Key zaten vardı, mesaj daha önce işlendi
print(f"Duplicate mesaj atlandı: {message_id}")
return None
try:
result = func(message, *args, **kwargs)
return result
except Exception as e:
# İşlem başarısız olduysa key'i sil ki tekrar denensin
redis_client.delete(dedup_key)
raise e
return wrapper
return decorator
@idempotent_handler(ttl_seconds=3600)
def process_order_created(message):
order_id = message['order_id']
# Sipariş işleme mantığı...
print(f"Sipariş işlendi: {order_id}")
return {"status": "success", "order_id": order_id}
Burada dikkat çekeceğim kritik bir nokta var: Exception durumunda key’i siliyoruz. Çünkü mesaj başarıyla işlenmediyse duplicate sayılmamalı, tekrar denenmelidir. Ama bu kararı iş mantığınıza göre vermelisiniz. Kimi zaman partial failure durumunda mesajı duplicate saymak daha güvenlidir.
Kafka’da Idempotent Producer Konfigürasyonu
Kafka, producer tarafında idempotency için built-in destek sunar. Ancak bu sadece broker’a yazma sırasında duplicate’i önler, consumer tarafını kapsamaz.
# Kafka producer konfigürasyonu - idempotent mod
cat > /opt/kafka/config/idempotent-producer.properties << 'EOF'
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
acks=all
enable.idempotence=true
max.in.flight.requests.per.connection=5
retries=2147483647
EOF
# Bu konfigürasyonla producer test et
kafka-producer-perf-test.sh
--topic payment-events
--num-records 10000
--record-size 1024
--throughput 1000
--producer.config /opt/kafka/config/idempotent-producer.properties
# Kafka consumer group offset yönetimi
# Aynı mesajı iki kez işlememek için offset'i doğru commit edin
# enable.auto.commit=false ile manuel commit yapın
kafka-consumer-groups.sh
--bootstrap-server localhost:9092
--describe
--group payment-processor-group
# Eğer bir consumer crash olduysa ve offset commit edilmediyse
# Lag değeri artar, mesajlar tekrar işlenir
# Bu tam olarak at-least-once delivery'nin kendisi
SQS ile At-Least-Once Delivery Yönetimi
AWS ortamında çalışıyorsanız SQS’in visibility timeout mekanizmasını iyi anlamak gerekiyor. Mesaj alındığında görünmez olur, ama silinmez. Eğer processing time, visibility timeout’u aşarsa mesaj tekrar görünür hale gelir.
import boto3
import json
from datetime import datetime
sqs = boto3.client('sqs', region_name='eu-west-1')
dynamodb = boto3.resource('dynamodb', region_name='eu-west-1')
QUEUE_URL = 'https://sqs.eu-west-1.amazonaws.com/123456789/payment-queue'
DEDUP_TABLE = dynamodb.Table('processed-messages')
def receive_and_process():
response = sqs.receive_message(
QueueUrl=QUEUE_URL,
MaxNumberOfMessages=10,
VisibilityTimeout=30, # 30 saniye - işlem bu sürede bitmeli
WaitTimeSeconds=20, # Long polling
AttributeNames=['All'],
MessageAttributeNames=['All']
)
messages = response.get('Messages', [])
for msg in messages:
receipt_handle = msg['ReceiptHandle']
body = json.loads(msg['Body'])
message_id = body.get('message_id', msg['MessageId'])
try:
# DynamoDB conditional write ile idempotency
DEDUP_TABLE.put_item(
Item={
'message_id': message_id,
'processed_at': datetime.utcnow().isoformat(),
'ttl': int(datetime.utcnow().timestamp()) + 86400 # 24 saat TTL
},
ConditionExpression='attribute_not_exists(message_id)'
)
# Asıl işi yap
process_payment(body)
# Başarılıysa mesajı sil
sqs.delete_message(
QueueUrl=QUEUE_URL,
ReceiptHandle=receipt_handle
)
except DEDUP_TABLE.meta.client.exceptions.ConditionalCheckFailedException:
# Mesaj zaten işlendi, sadece sil
print(f"Duplicate mesaj siliniyor: {message_id}")
sqs.delete_message(
QueueUrl=QUEUE_URL,
ReceiptHandle=receipt_handle
)
except Exception as e:
# İşlem başarısız, mesajı silme - visibility timeout dolunca tekrar görünür olacak
print(f"Hata: {e}, mesaj tekrar denenecek")
Dead Letter Queue ve Poison Message Yönetimi
At-least-once delivery ile çalışırken bir mesajın sürekli başarısız olup tekrar kuyruğa girdiği durumlarla karşılaşırsınız. Bu “poison message” problemidir. DLQ (Dead Letter Queue) bu mesajları yakalamak için kritiktir.
# RabbitMQ'da DLQ konfigürasyonu
rabbitmqadmin declare queue
name=payment.dlq
durable=true
rabbitmqadmin declare queue
name=payment.main
durable=true
arguments='{"x-dead-letter-exchange": "dlx", "x-max-retries": 3}'
rabbitmqadmin declare exchange
name=dlx
type=direct
rabbitmqadmin declare binding
source=dlx
destination=payment.dlq
routing_key=payment.main
# Retry sayısını mesaj header'ında takip etme
def handle_message_with_retry(channel, method, properties, body):
message = json.loads(body)
# Header'dan retry sayısını al
headers = properties.headers or {}
retry_count = headers.get('x-retry-count', 0)
max_retries = 3
try:
process_message(message)
channel.basic_ack(delivery_tag=method.delivery_tag)
except RetryableError as e:
if retry_count < max_retries:
# Exponential backoff ile tekrar kuyruğa ekle
wait_seconds = 2 ** retry_count
print(f"Retry {retry_count + 1}/{max_retries}, {wait_seconds}s sonra")
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
# Delay ile tekrar yayınla
new_headers = {**headers, 'x-retry-count': retry_count + 1}
channel.basic_publish(
exchange='',
routing_key='payment.main',
body=body,
properties=pika.BasicProperties(
headers=new_headers,
expiration=str(wait_seconds * 1000) # ms cinsinden
)
)
else:
# Max retry aşıldı, DLQ'ya gönder
print(f"Max retry aşıldı, DLQ'ya gönderiliyor: {message.get('message_id')}")
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
except NonRetryableError as e:
# Tekrarlanamaz hata, direkt DLQ'ya
print(f"Tekrarlanamaz hata: {e}")
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
Outbox Pattern: En Güvenilir Yaklaşım
Bazen mesaj göndermek ve veritabanı işlemini atomik yapmak istersiniz. “Siparişi veritabanına kaydettim ama mesajı gönderirken crash oldum” senaryosunu önlemek için Outbox Pattern kullanılır.
# Outbox Pattern implementasyonu
def create_order_with_outbox(order_data):
with db.transaction():
# Siparişi kaydet
order_id = db.execute(
"INSERT INTO orders (user_id, amount, status) VALUES (?, ?, 'pending') RETURNING id",
(order_data['user_id'], order_data['amount'])
).fetchone()[0]
# Aynı transaction içinde outbox tablosuna mesajı kaydet
db.execute(
"""
INSERT INTO outbox_messages
(message_id, aggregate_type, aggregate_id, event_type, payload, status, created_at)
VALUES (?, 'Order', ?, 'OrderCreated', ?, 'pending', ?)
""",
(
str(uuid.uuid4()),
order_id,
json.dumps({
"order_id": order_id,
"user_id": order_data['user_id'],
"amount": order_data['amount']
}),
datetime.utcnow()
)
)
return order_id
# Ayrı bir worker outbox'ı okuyup mesajları yayınlar
def outbox_publisher():
while True:
pending_messages = db.execute(
"SELECT * FROM outbox_messages WHERE status = 'pending' ORDER BY created_at LIMIT 100"
).fetchall()
for msg in pending_messages:
try:
publish_to_broker(msg)
db.execute(
"UPDATE outbox_messages SET status = 'published', published_at = ? WHERE id = ?",
(datetime.utcnow(), msg['id'])
)
except Exception as e:
print(f"Yayın hatası: {e}")
time.sleep(1)
Monitoring: Duplicate Oranını Takip Edin
Üretim ortamında kaç mesajın duplicate olarak işlendiğini bilmek önemlidir. Çok yüksekse sisteminizde bir sorun var demektir.
# Prometheus metrics ile duplicate tracking
# Aşağıdaki gibi bir metrics endpoint'i expose edin
cat > /etc/prometheus/rules/message-queue.yml << 'EOF'
groups:
- name: message_queue_alerts
rules:
- alert: HighDuplicateRate
expr: |
rate(messages_duplicates_total[5m]) / rate(messages_processed_total[5m]) > 0.05
for: 2m
labels:
severity: warning
annotations:
summary: "Yüksek duplicate mesaj oranı: {{ $value | humanizePercentage }}"
description: "Son 5 dakikada duplicate oranı %5'i geçti"
- alert: UnacknowledgedMessagesHigh
expr: rabbitmq_queue_messages_unacknowledged > 1000
for: 5m
labels:
severity: critical
annotations:
summary: "Çok fazla unacknowledged mesaj var"
EOF
Sonuç
At-least-once delivery ve idempotency aslında bir trade-off kabulüdür. “Exactly-once garantisi vermeyi başaramıyorum, o zaman en azından mesajların kesinlikle işleneceğini garanti edeyim ve consumer tarafını buna dayanıklı hale getireyim” diyorsunuz.
Pratik özet olarak şunu söyleyebilirim: Her consumer’ınız idempotent olmak zorunda. Bu tartışılmaz. Bunu sağlamak için şu üç şeyi mutlaka yapın:
- Her mesaja unique ID atayın ve bunu producer tarafında belirleyin, broker’a bırakmayın
- Deduplication store kullanın, Redis veya DynamoDB gibi hızlı bir çözüm tercih edin, ilişkisel veritabanı yük altında darboğaz olabilir
- Outbox Pattern’i kritik business işlemler için uygulayın, özellikle ödeme ve stok gibi tutarlılığın hayati önem taşıdığı alanlarda
DLQ’ları ihmal etmeyin. Poison message’lar fark edilmeden sistemde dönerken kaynak tüketen sessiz bir düşman olabilir. Ve son olarak duplicate oranınızı mutlaka izleyin. Eğer bu oran aniden artıyorsa, consumer’larınızda bir sorun ya da konfigürasyon hatası var demektir.
Bu mimarileri doğru kurduğunuzda “gece üçte telefon” senaryolarından büyük ölçüde kurtulursunuz. Büyük ölçüde diyorum, çünkü distributed sistemlerde sıfır sorun diye bir şey yoktur, ama en azından çift ödeme göndermezsiniz.
