Mesaj Kuyruğunda Idempotency ve At-Least-Once Delivery

Prodüksiyonda bir mesaj kuyruğu sistemi kurduğunuzda, ilk başta her şey güzel görünür. Mesajlar gönderiliyor, işleniyor, hayat güzel. Sonra bir gece üçte telefonunuz çalıyor: “Kullanıcılara çift ödeme gitti.” İşte o an at-least-once delivery ile idempotency’nin neden bu kadar kritik olduğunu anlıyorsunuz. Bu yazıda bu iki kavramı, aralarındaki ilişkiyi ve gerçek dünyada nasıl uyguladığımızı konuşacağız.

At-Least-Once Delivery Nedir ve Neden Varsayılan Budur

Mesaj kuyruğu sistemlerinin büyük çoğunluğu at-least-once delivery garantisi verir. RabbitMQ, Apache Kafka, AWS SQS, bunların hepsi bu kategoriye girer. Peki neden exactly-once değil de at-least-once?

Çünkü distributed sistemlerde exactly-once garantisi vermek inanılmaz derecede pahalıdır. Ağ bölünmesi, consumer crash’leri, network timeout’ları… bunların hepsinde “bu mesaj tam olarak bir kez işlendi” demek için koordinasyon maliyeti çok yüksektir. Kafka’nın idempotent producer özelliği bile tam anlamıyla exactly-once değil, producer tarafında deduplication sağlıyor.

At-least-once delivery şu anlama gelir: Bir mesajın işleneceğini garanti ederim, ama belki birden fazla kez işlenebilir. Consumer mesajı aldı, işledi, ama acknowledgment göndermeden çöktü mü? Broker mesajı tekrar kuyruğa koyar. Consumer yavaş çalıştığı için heartbeat timeout doldu mu? Yine aynı mesaj başka bir consumer’a gider.

# RabbitMQ'da bir mesajın requeue edilme durumunu izlemek için
rabbitmqctl list_queues name messages messages_ready messages_unacknowledged

# Unacknowledged mesajları görmek - bunlar potansiyel duplicate kaynağı
rabbitmqctl list_consumers

Şimdi asıl soruya gelelim: Eğer bir mesaj birden fazla kez işlenebiliyorsa, sisteminiz buna hazır mı?

Idempotency Kavramı: Matematiğin Mühendisliğe Katkısı

Idempotency matematikten gelir. Bir işlem idempotent’tir eğer aynı inputla defalarca çalıştırıldığında her seferinde aynı sonucu üretiyorsa. HTTP metodları açısından düşünün: GET idempotent’tir, DELETE idempotent’tir (kaynağı bulduysa sil, bulamazsan da 404 dön, sonuç aynı: kaynak yok), ama POST genellikle değildir.

Mesaj kuyruğu bağlamında idempotency şu demek: Aynı mesajı 5 kez işlesen de 1 kez işlesen de sistem aynı durumda kalmalı.

# Idempotent OLMAYAN handler - tehlikeli!
def process_payment_message(message):
    user_id = message['user_id']
    amount = message['amount']
    
    # Bu kod çift çalışırsa kullanıcıdan iki kez para çekilir!
    db.execute(
        "INSERT INTO payments (user_id, amount, created_at) VALUES (?, ?, ?)",
        (user_id, amount, datetime.now())
    )
    send_email(user_id, "Ödemeniz alındı")
# Idempotent handler - güvenli
def process_payment_message(message):
    message_id = message['message_id']  # Unique identifier
    user_id = message['user_id']
    amount = message['amount']
    
    # Önce bu mesajı daha önce işledik mi kontrol et
    existing = db.execute(
        "SELECT id FROM processed_messages WHERE message_id = ?",
        (message_id,)
    ).fetchone()
    
    if existing:
        logger.info(f"Mesaj {message_id} zaten işlendi, atlıyorum")
        return  # Sessizce çık, hata değil bu
    
    # Transaction içinde hem ödemi kaydet hem de mesajı işlenmiş olarak işaretle
    with db.transaction():
        db.execute(
            "INSERT INTO payments (user_id, amount, message_id) VALUES (?, ?, ?)",
            (user_id, amount, message_id)
        )
        db.execute(
            "INSERT INTO processed_messages (message_id, processed_at) VALUES (?, ?)",
            (message_id, datetime.now())
        )
    
    send_email(user_id, "Ödemeniz alındı")

Message ID Stratejisi: Kimin ID’sini Kullanacaksınız?

Bu noktada kritik bir soru çıkıyor: Mesaj ID’si nereden geliyor? Birkaç strateji var:

Broker’ın atadığı ID: RabbitMQ delivery tag, Kafka offset gibi. Bu seçenek tehlikelidir çünkü farklı consumer örnekleri farklı delivery tag alabilir ama aynı mesaj olabilir.

Producer’ın atadığı UUID: En sağlıklı yöntem. Mesajı oluşturan taraf UUID üretir ve bu ID mesajın ömrü boyunca değişmez.

Business key: İdeal senaryo. “Sipariş #12345 için ödeme işle” gibi. Bu durumda sipariş ID’si doğal idempotency key’i oluyor.

# Kafka'da mesaj key kullanımı - aynı key aynı partition'a gider
# Bu producer tarafında natural ordering sağlar
kafka-console-producer.sh 
  --broker-list localhost:9092 
  --topic payments 
  --property "parse.key=true" 
  --property "key.separator=:" << EOF
order-12345:{"order_id": "12345", "amount": 150.00, "currency": "TRY"}
order-12346:{"order_id": "12346", "amount": 275.50, "currency": "TRY"}
EOF
# Producer tarafında idempotency key oluşturma
import uuid
import json
from datetime import datetime

def create_payment_message(order_id, user_id, amount):
    return {
        "message_id": str(uuid.uuid4()),  # Global unique ID
        "idempotency_key": f"payment-{order_id}",  # Business key
        "order_id": order_id,
        "user_id": user_id,
        "amount": amount,
        "currency": "TRY",
        "created_at": datetime.utcnow().isoformat(),
        "version": 1
    }

# Mesajı gönderirken
message = create_payment_message(
    order_id="ORD-2024-00123",
    user_id="USR-456",
    amount=299.90
)

# RabbitMQ'ya gönder
channel.basic_publish(
    exchange='payments',
    routing_key='payment.process',
    body=json.dumps(message),
    properties=pika.BasicProperties(
        message_id=message['message_id'],
        content_type='application/json',
        delivery_mode=2  # Persistent
    )
)

Deduplication Store: Redis ile Pratik Yaklaşım

Veritabanına her mesaj için sorgu atmak yük altında sorun yaratabilir. Redis burada devreye giriyor. TTL destekli bir deduplication store kurarsanız hem hızlı hem de otomatik temizlenen bir çözümünüz olur.

import redis
import json
from functools import wraps

redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)

def idempotent_handler(ttl_seconds=86400):  # 24 saat default
    def decorator(func):
        @wraps(func)
        def wrapper(message, *args, **kwargs):
            message_id = message.get('message_id')
            
            if not message_id:
                raise ValueError("Mesaj ID'si yok, idempotency garantisi verilemiyor!")
            
            dedup_key = f"processed:{func.__name__}:{message_id}"
            
            # SET NX: Sadece key yoksa set et, atomik operasyon
            was_set = redis_client.set(
                dedup_key, 
                "1", 
                ex=ttl_seconds, 
                nx=True
            )
            
            if not was_set:
                # Key zaten vardı, mesaj daha önce işlendi
                print(f"Duplicate mesaj atlandı: {message_id}")
                return None
            
            try:
                result = func(message, *args, **kwargs)
                return result
            except Exception as e:
                # İşlem başarısız olduysa key'i sil ki tekrar denensin
                redis_client.delete(dedup_key)
                raise e
        
        return wrapper
    return decorator

@idempotent_handler(ttl_seconds=3600)
def process_order_created(message):
    order_id = message['order_id']
    # Sipariş işleme mantığı...
    print(f"Sipariş işlendi: {order_id}")
    return {"status": "success", "order_id": order_id}

Burada dikkat çekeceğim kritik bir nokta var: Exception durumunda key’i siliyoruz. Çünkü mesaj başarıyla işlenmediyse duplicate sayılmamalı, tekrar denenmelidir. Ama bu kararı iş mantığınıza göre vermelisiniz. Kimi zaman partial failure durumunda mesajı duplicate saymak daha güvenlidir.

Kafka’da Idempotent Producer Konfigürasyonu

Kafka, producer tarafında idempotency için built-in destek sunar. Ancak bu sadece broker’a yazma sırasında duplicate’i önler, consumer tarafını kapsamaz.

# Kafka producer konfigürasyonu - idempotent mod
cat > /opt/kafka/config/idempotent-producer.properties << 'EOF'
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
acks=all
enable.idempotence=true
max.in.flight.requests.per.connection=5
retries=2147483647
EOF

# Bu konfigürasyonla producer test et
kafka-producer-perf-test.sh 
  --topic payment-events 
  --num-records 10000 
  --record-size 1024 
  --throughput 1000 
  --producer.config /opt/kafka/config/idempotent-producer.properties
# Kafka consumer group offset yönetimi
# Aynı mesajı iki kez işlememek için offset'i doğru commit edin
# enable.auto.commit=false ile manuel commit yapın

kafka-consumer-groups.sh 
  --bootstrap-server localhost:9092 
  --describe 
  --group payment-processor-group

# Eğer bir consumer crash olduysa ve offset commit edilmediyse
# Lag değeri artar, mesajlar tekrar işlenir
# Bu tam olarak at-least-once delivery'nin kendisi

SQS ile At-Least-Once Delivery Yönetimi

AWS ortamında çalışıyorsanız SQS’in visibility timeout mekanizmasını iyi anlamak gerekiyor. Mesaj alındığında görünmez olur, ama silinmez. Eğer processing time, visibility timeout’u aşarsa mesaj tekrar görünür hale gelir.

import boto3
import json
from datetime import datetime

sqs = boto3.client('sqs', region_name='eu-west-1')
dynamodb = boto3.resource('dynamodb', region_name='eu-west-1')

QUEUE_URL = 'https://sqs.eu-west-1.amazonaws.com/123456789/payment-queue'
DEDUP_TABLE = dynamodb.Table('processed-messages')

def receive_and_process():
    response = sqs.receive_message(
        QueueUrl=QUEUE_URL,
        MaxNumberOfMessages=10,
        VisibilityTimeout=30,  # 30 saniye - işlem bu sürede bitmeli
        WaitTimeSeconds=20,    # Long polling
        AttributeNames=['All'],
        MessageAttributeNames=['All']
    )
    
    messages = response.get('Messages', [])
    
    for msg in messages:
        receipt_handle = msg['ReceiptHandle']
        body = json.loads(msg['Body'])
        message_id = body.get('message_id', msg['MessageId'])
        
        try:
            # DynamoDB conditional write ile idempotency
            DEDUP_TABLE.put_item(
                Item={
                    'message_id': message_id,
                    'processed_at': datetime.utcnow().isoformat(),
                    'ttl': int(datetime.utcnow().timestamp()) + 86400  # 24 saat TTL
                },
                ConditionExpression='attribute_not_exists(message_id)'
            )
            
            # Asıl işi yap
            process_payment(body)
            
            # Başarılıysa mesajı sil
            sqs.delete_message(
                QueueUrl=QUEUE_URL,
                ReceiptHandle=receipt_handle
            )
            
        except DEDUP_TABLE.meta.client.exceptions.ConditionalCheckFailedException:
            # Mesaj zaten işlendi, sadece sil
            print(f"Duplicate mesaj siliniyor: {message_id}")
            sqs.delete_message(
                QueueUrl=QUEUE_URL,
                ReceiptHandle=receipt_handle
            )
        except Exception as e:
            # İşlem başarısız, mesajı silme - visibility timeout dolunca tekrar görünür olacak
            print(f"Hata: {e}, mesaj tekrar denenecek")

Dead Letter Queue ve Poison Message Yönetimi

At-least-once delivery ile çalışırken bir mesajın sürekli başarısız olup tekrar kuyruğa girdiği durumlarla karşılaşırsınız. Bu “poison message” problemidir. DLQ (Dead Letter Queue) bu mesajları yakalamak için kritiktir.

# RabbitMQ'da DLQ konfigürasyonu
rabbitmqadmin declare queue 
  name=payment.dlq 
  durable=true

rabbitmqadmin declare queue 
  name=payment.main 
  durable=true 
  arguments='{"x-dead-letter-exchange": "dlx", "x-max-retries": 3}'

rabbitmqadmin declare exchange 
  name=dlx 
  type=direct

rabbitmqadmin declare binding 
  source=dlx 
  destination=payment.dlq 
  routing_key=payment.main
# Retry sayısını mesaj header'ında takip etme
def handle_message_with_retry(channel, method, properties, body):
    message = json.loads(body)
    
    # Header'dan retry sayısını al
    headers = properties.headers or {}
    retry_count = headers.get('x-retry-count', 0)
    max_retries = 3
    
    try:
        process_message(message)
        channel.basic_ack(delivery_tag=method.delivery_tag)
        
    except RetryableError as e:
        if retry_count < max_retries:
            # Exponential backoff ile tekrar kuyruğa ekle
            wait_seconds = 2 ** retry_count
            print(f"Retry {retry_count + 1}/{max_retries}, {wait_seconds}s sonra")
            
            channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
            
            # Delay ile tekrar yayınla
            new_headers = {**headers, 'x-retry-count': retry_count + 1}
            channel.basic_publish(
                exchange='',
                routing_key='payment.main',
                body=body,
                properties=pika.BasicProperties(
                    headers=new_headers,
                    expiration=str(wait_seconds * 1000)  # ms cinsinden
                )
            )
        else:
            # Max retry aşıldı, DLQ'ya gönder
            print(f"Max retry aşıldı, DLQ'ya gönderiliyor: {message.get('message_id')}")
            channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
    
    except NonRetryableError as e:
        # Tekrarlanamaz hata, direkt DLQ'ya
        print(f"Tekrarlanamaz hata: {e}")
        channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

Outbox Pattern: En Güvenilir Yaklaşım

Bazen mesaj göndermek ve veritabanı işlemini atomik yapmak istersiniz. “Siparişi veritabanına kaydettim ama mesajı gönderirken crash oldum” senaryosunu önlemek için Outbox Pattern kullanılır.

# Outbox Pattern implementasyonu
def create_order_with_outbox(order_data):
    with db.transaction():
        # Siparişi kaydet
        order_id = db.execute(
            "INSERT INTO orders (user_id, amount, status) VALUES (?, ?, 'pending') RETURNING id",
            (order_data['user_id'], order_data['amount'])
        ).fetchone()[0]
        
        # Aynı transaction içinde outbox tablosuna mesajı kaydet
        db.execute(
            """
            INSERT INTO outbox_messages 
            (message_id, aggregate_type, aggregate_id, event_type, payload, status, created_at)
            VALUES (?, 'Order', ?, 'OrderCreated', ?, 'pending', ?)
            """,
            (
                str(uuid.uuid4()),
                order_id,
                json.dumps({
                    "order_id": order_id,
                    "user_id": order_data['user_id'],
                    "amount": order_data['amount']
                }),
                datetime.utcnow()
            )
        )
        
    return order_id

# Ayrı bir worker outbox'ı okuyup mesajları yayınlar
def outbox_publisher():
    while True:
        pending_messages = db.execute(
            "SELECT * FROM outbox_messages WHERE status = 'pending' ORDER BY created_at LIMIT 100"
        ).fetchall()
        
        for msg in pending_messages:
            try:
                publish_to_broker(msg)
                db.execute(
                    "UPDATE outbox_messages SET status = 'published', published_at = ? WHERE id = ?",
                    (datetime.utcnow(), msg['id'])
                )
            except Exception as e:
                print(f"Yayın hatası: {e}")
        
        time.sleep(1)

Monitoring: Duplicate Oranını Takip Edin

Üretim ortamında kaç mesajın duplicate olarak işlendiğini bilmek önemlidir. Çok yüksekse sisteminizde bir sorun var demektir.

# Prometheus metrics ile duplicate tracking
# Aşağıdaki gibi bir metrics endpoint'i expose edin

cat > /etc/prometheus/rules/message-queue.yml << 'EOF'
groups:
  - name: message_queue_alerts
    rules:
      - alert: HighDuplicateRate
        expr: |
          rate(messages_duplicates_total[5m]) / rate(messages_processed_total[5m]) > 0.05
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "Yüksek duplicate mesaj oranı: {{ $value | humanizePercentage }}"
          description: "Son 5 dakikada duplicate oranı %5'i geçti"
      
      - alert: UnacknowledgedMessagesHigh
        expr: rabbitmq_queue_messages_unacknowledged > 1000
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Çok fazla unacknowledged mesaj var"
EOF

Sonuç

At-least-once delivery ve idempotency aslında bir trade-off kabulüdür. “Exactly-once garantisi vermeyi başaramıyorum, o zaman en azından mesajların kesinlikle işleneceğini garanti edeyim ve consumer tarafını buna dayanıklı hale getireyim” diyorsunuz.

Pratik özet olarak şunu söyleyebilirim: Her consumer’ınız idempotent olmak zorunda. Bu tartışılmaz. Bunu sağlamak için şu üç şeyi mutlaka yapın:

  • Her mesaja unique ID atayın ve bunu producer tarafında belirleyin, broker’a bırakmayın
  • Deduplication store kullanın, Redis veya DynamoDB gibi hızlı bir çözüm tercih edin, ilişkisel veritabanı yük altında darboğaz olabilir
  • Outbox Pattern’i kritik business işlemler için uygulayın, özellikle ödeme ve stok gibi tutarlılığın hayati önem taşıdığı alanlarda

DLQ’ları ihmal etmeyin. Poison message’lar fark edilmeden sistemde dönerken kaynak tüketen sessiz bir düşman olabilir. Ve son olarak duplicate oranınızı mutlaka izleyin. Eğer bu oran aniden artıyorsa, consumer’larınızda bir sorun ya da konfigürasyon hatası var demektir.

Bu mimarileri doğru kurduğunuzda “gece üçte telefon” senaryolarından büyük ölçüde kurtulursunuz. Büyük ölçüde diyorum, çünkü distributed sistemlerde sıfır sorun diye bir şey yoktur, ama en azından çift ödeme göndermezsiniz.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir