Saga Pattern ile Dağıtık İşlem Yönetimi

Dağıtık sistemlerde işlem yönetimi, kariyer boyunca karşılaşacağınız en karmaşık sorunlardan biridir. Monolitik bir uygulamada her şey tek bir veritabanı transaction’ı içinde çözülürken, mikroservis mimarisine geçtiğinizde bu konfor ortadan kalkar. Bir e-ticaret siparişini düşünün: stok servisi, ödeme servisi, kargo servisi ve bildirim servisi… Bunların hepsinin tutarlı bir şekilde çalışması gerekiyor. İşte Saga Pattern tam bu noktada devreye giriyor.

Neden Dağıtık Transaction Bu Kadar Zor?

İlk mikroservis projesinde 2PC (Two-Phase Commit) kullanmaya çalışan herkes ne demek istediğimi anlayacaktır. Teoride güzel görünür, pratikte dağıtık sistemlerin gerçekliğiyle yüz yüze gelirsiniz. Ağ bölünmeleri, servis çökmeleri, timeout’lar… 2PC bu senaryolarda hem performans hem de güvenilirlik açısından felaket olabilir.

CAP teoremi bize şunu söylüyor: Consistency, Availability ve Partition Tolerance’ın üçünü aynı anda elde edemezsiniz. Gerçek dünya mikroservis mimarilerinde çoğunlukla AP (Available + Partition Tolerant) tarafında yer alırız ve eventual consistency ile çalışırız. Saga Pattern tam da bu eventual consistency modelini uygulama katmanında yönetmenin sistematik bir yoludur.

Saga Pattern’in Temel Mantığı

Saga, uzun süren bir işlemi birbirine bağlı küçük lokal transaction’lara böler. Her lokal transaction kendi servisinin veritabanını günceller ve bir sonraki adımı tetikleyen bir event yayınlar. Eğer herhangi bir adımda hata oluşursa, önceki adımların etkilerini geri almak için compensating transaction (telafi işlemi) çalıştırılır.

Şunu net söyleyeyim: Compensating transaction, gerçek bir rollback değildir. Veritabanı seviyesindeki atomik rollback gibi çalışmaz. Bunun yerine, yapılan işlemi mantıksal olarak geri alan yeni bir işlem gerçekleştirir. Örneğin “stok düş” işleminin compensating transaction’ı “stok artır” olur.

İki temel Saga implementasyon stili vardır:

  • Choreography (Koreografi): Servisler birbirleriyle doğrudan event’ler üzerinden konuşur, merkezi bir koordinatör yoktur.
  • Orchestration (Orkestrasyon): Merkezi bir saga orkestratörü hangi servisin ne zaman çalışacağını yönetir.

Choreography Tabanlı Saga

Choreography yaklaşımında her servis kendi event’lerini dinler ve kendi event’lerini yayınlar. Aşağıdaki sipariş örneğini RabbitMQ ile ele alalım:

# order_service.py - Sipariş servisi
import pika
import json
import uuid

def create_order(order_data):
    order_id = str(uuid.uuid4())
    # Lokal transaction: Siparişi PENDING durumunda kaydet
    db.orders.insert({
        "id": order_id,
        "status": "PENDING",
        "items": order_data["items"],
        "user_id": order_data["user_id"],
        "total": order_data["total"]
    })
    
    # Order Created event'i yayınla
    publish_event("order.created", {
        "order_id": order_id,
        "user_id": order_data["user_id"],
        "items": order_data["items"],
        "total": order_data["total"]
    })
    
    return order_id

def publish_event(routing_key, payload):
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('rabbitmq')
    )
    channel = connection.channel()
    channel.exchange_declare(
        exchange='saga_events', 
        exchange_type='topic',
        durable=True
    )
    channel.basic_publish(
        exchange='saga_events',
        routing_key=routing_key,
        body=json.dumps(payload),
        properties=pika.BasicProperties(
            delivery_mode=2,  # Kalıcı mesaj
            message_id=str(uuid.uuid4())
        )
    )
    connection.close()
# inventory_service.py - Stok servisi
import pika
import json

def handle_order_created(ch, method, properties, body):
    event = json.loads(body)
    order_id = event["order_id"]
    
    try:
        # Stok rezervasyonu dene
        for item in event["items"]:
            if not reserve_stock(item["product_id"], item["quantity"]):
                raise Exception(f"Yetersiz stok: {item['product_id']}")
        
        # Başarılı: inventory.reserved event'i yayınla
        publish_event("inventory.reserved", {
            "order_id": order_id,
            "user_id": event["user_id"],
            "total": event["total"]
        })
        ch.basic_ack(delivery_tag=method.delivery_tag)
        
    except Exception as e:
        # Başarısız: inventory.reserve_failed event'i yayınla
        publish_event("inventory.reserve_failed", {
            "order_id": order_id,
            "reason": str(e)
        })
        ch.basic_ack(delivery_tag=method.delivery_tag)

def start_consumer():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('rabbitmq')
    )
    channel = connection.channel()
    channel.queue_declare(queue='inventory_queue', durable=True)
    channel.queue_bind(
        exchange='saga_events',
        queue='inventory_queue',
        routing_key='order.created'
    )
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(
        queue='inventory_queue',
        on_message_callback=handle_order_created
    )
    channel.start_consuming()

Choreography yaklaşımının güzel yanı sadeliğidir. Merkezi bir nokta yoktur, servisler birbirinden bağımsız ölçeklenir. Ancak iş akışı karmaşıklaştıkça hangi servisin ne zaman ne yaptığını anlamak zorlaşır. Ben buna “choreography spaghetti” diyorum, 5-6 servis geçtikten sonra event akışını kafanızda canlandırmak imkansız hale gelir.

Orchestration Tabanlı Saga

Orchestration yaklaşımında merkezi bir saga orkestratörü tüm akışı yönetir. Bu yaklaşım daha fazla kontrol sunar ve iş akışını tek bir yerde görünür kılar.

# saga_orchestrator.py - Sipariş Saga Orkestratörü
import json
import uuid
from enum import Enum

class OrderSagaState(Enum):
    STARTED = "STARTED"
    INVENTORY_RESERVING = "INVENTORY_RESERVING"
    PAYMENT_PROCESSING = "PAYMENT_PROCESSING"
    SHIPPING_SCHEDULING = "SHIPPING_SCHEDULING"
    COMPLETED = "COMPLETED"
    COMPENSATING = "COMPENSATING"
    FAILED = "FAILED"

class OrderSagaOrchestrator:
    def __init__(self, message_broker, saga_store):
        self.broker = message_broker
        self.store = saga_store
    
    def start_saga(self, order_data):
        saga_id = str(uuid.uuid4())
        
        # Saga durumunu kaydet
        saga = {
            "id": saga_id,
            "state": OrderSagaState.STARTED.value,
            "order_data": order_data,
            "compensations": []
        }
        self.store.save(saga)
        
        # İlk adımı başlat: Stok rezervasyonu
        self._request_inventory_reservation(saga_id, order_data)
        return saga_id
    
    def _request_inventory_reservation(self, saga_id, order_data):
        self._update_state(saga_id, OrderSagaState.INVENTORY_RESERVING)
        self.broker.send_command("inventory.reserve", {
            "saga_id": saga_id,
            "items": order_data["items"]
        })
    
    def handle_inventory_reserved(self, event):
        saga_id = event["saga_id"]
        saga = self.store.get(saga_id)
        
        # Compensation stack'e ekle
        saga["compensations"].append({
            "service": "inventory",
            "action": "release_reservation",
            "data": event
        })
        self.store.save(saga)
        
        # Sonraki adım: Ödeme
        self._update_state(saga_id, OrderSagaState.PAYMENT_PROCESSING)
        self.broker.send_command("payment.process", {
            "saga_id": saga_id,
            "user_id": saga["order_data"]["user_id"],
            "amount": saga["order_data"]["total"]
        })
    
    def handle_payment_failed(self, event):
        saga_id = event["saga_id"]
        saga = self.store.get(saga_id)
        
        # Compensating transaction'ları çalıştır
        self._update_state(saga_id, OrderSagaState.COMPENSATING)
        self._execute_compensations(saga)
    
    def _execute_compensations(self, saga):
        # LIFO sırasıyla compensation'ları çalıştır
        for compensation in reversed(saga["compensations"]):
            self.broker.send_command(
                f"{compensation['service']}.{compensation['action']}",
                compensation["data"]
            )
        self._update_state(saga["id"], OrderSagaState.FAILED)
    
    def _update_state(self, saga_id, state):
        saga = self.store.get(saga_id)
        saga["state"] = state.value
        self.store.save(saga)
# payment_service.py - Ödeme servisi (komut dinleyici)
import pika
import json

class PaymentCommandHandler:
    def handle_process_payment(self, ch, method, properties, body):
        command = json.loads(body)
        saga_id = command["saga_id"]
        
        try:
            # Ödeme işlemi
            transaction_id = self.payment_gateway.charge(
                user_id=command["user_id"],
                amount=command["amount"]
            )
            
            # Başarı: Orkestratöre bildir
            self.publish_reply("payment.processed", {
                "saga_id": saga_id,
                "transaction_id": transaction_id
            })
            
        except InsufficientFundsError as e:
            # Başarısız: Orkestratöre bildir
            self.publish_reply("payment.failed", {
                "saga_id": saga_id,
                "reason": "INSUFFICIENT_FUNDS",
                "details": str(e)
            })
        
        ch.basic_ack(delivery_tag=method.delivery_tag)

Idempotency ve Exactly-Once Semantics

Saga implementasyonlarında en kritik konulardan biri idempotency’dir. Mesaj kuyruklarında at-least-once delivery garantisi bulunduğundan, aynı mesaj birden fazla kez işlenebilir. Bu durumda operasyonlarınız idempotent olmalıdır.

# idempotent_handler.py
import hashlib

class IdempotentCommandHandler:
    def __init__(self, db, redis_client):
        self.db = db
        self.redis = redis_client
    
    def handle_with_idempotency(self, message_id, handler_func, *args, **kwargs):
        """
        Redis ile idempotency kontrolü.
        Aynı message_id daha önce işlendiyse tekrar işleme.
        """
        lock_key = f"processed_message:{message_id}"
        
        # SET NX (Set if Not eXists) ile atomik kontrol
        if not self.redis.set(lock_key, "1", nx=True, ex=86400):
            # Bu mesaj daha önce işlenmiş, skip et
            print(f"Mesaj zaten işlendi: {message_id}")
            return None
        
        try:
            result = handler_func(*args, **kwargs)
            return result
        except Exception as e:
            # Hata durumunda kilit kaldır, yeniden işlenebilsin
            self.redis.delete(lock_key)
            raise e

# Kullanım örneği
def reserve_stock_idempotent(message_id, order_id, items):
    handler = IdempotentCommandHandler(db, redis)
    return handler.handle_with_idempotency(
        message_id,
        _do_reserve_stock,
        order_id,
        items
    )

Saga Durumunu Kalıcı Hale Getirmek

Saga orkestratörünün kendisi de çökebilir. Bu yüzden saga durumunun dayanıklı bir şekilde saklanması şart. Aşağıda PostgreSQL ile saga state management örneği var:

-- saga_instances tablosu
CREATE TABLE saga_instances (
    id UUID PRIMARY KEY,
    saga_type VARCHAR(100) NOT NULL,
    state VARCHAR(50) NOT NULL,
    payload JSONB NOT NULL,
    compensations JSONB DEFAULT '[]',
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW(),
    version INTEGER DEFAULT 1  -- Optimistic locking için
);

-- Saga geçmişi için audit trail
CREATE TABLE saga_events (
    id SERIAL PRIMARY KEY,
    saga_id UUID REFERENCES saga_instances(id),
    event_type VARCHAR(100) NOT NULL,
    event_data JSONB,
    occurred_at TIMESTAMP DEFAULT NOW()
);

CREATE INDEX idx_saga_state ON saga_instances(state);
CREATE INDEX idx_saga_created ON saga_instances(created_at);
# saga_store.py - PostgreSQL tabanlı Saga Store
import psycopg2
import json

class PostgresSagaStore:
    def save(self, saga):
        with self.db.cursor() as cur:
            cur.execute("""
                INSERT INTO saga_instances 
                    (id, saga_type, state, payload, compensations, version)
                VALUES (%s, %s, %s, %s, %s, %s)
                ON CONFLICT (id) DO UPDATE SET
                    state = EXCLUDED.state,
                    payload = EXCLUDED.payload,
                    compensations = EXCLUDED.compensations,
                    updated_at = NOW(),
                    version = saga_instances.version + 1
                WHERE saga_instances.version = %s
            """, (
                saga["id"],
                saga["type"],
                saga["state"],
                json.dumps(saga["order_data"]),
                json.dumps(saga["compensations"]),
                saga.get("version", 1),
                saga.get("version", 1)  -- Optimistic lock kontrolü
            ))
            
            if cur.rowcount == 0:
                raise ConcurrentModificationError(
                    f"Saga {saga['id']} başka bir işlem tarafından güncellendi"
                )
            
            # Audit log kaydet
            cur.execute("""
                INSERT INTO saga_events (saga_id, event_type, event_data)
                VALUES (%s, %s, %s)
            """, (saga["id"], "STATE_CHANGED", json.dumps({"new_state": saga["state"]})))
            
        self.db.commit()

Timeout ve Retry Stratejileri

Gerçek dünyada servisler geç yanıt verebilir ya da hiç yanıt vermeyebilir. Saga’nın askıda kalmaması için timeout mekanizması şart:

# saga_timeout_handler.py
import schedule
import time

class SagaTimeoutWatcher:
    def __init__(self, saga_store, orchestrator, timeout_minutes=30):
        self.store = saga_store
        self.orchestrator = orchestrator
        self.timeout = timeout_minutes
    
    def check_stuck_sagas(self):
        """Belirli bir süre ilerleyemeyen saga'ları tespit et."""
        stuck_sagas = self.store.find_stuck_sagas(
            timeout_minutes=self.timeout,
            states=["INVENTORY_RESERVING", "PAYMENT_PROCESSING", "SHIPPING_SCHEDULING"]
        )
        
        for saga in stuck_sagas:
            print(f"Timeout tespit edildi - Saga: {saga['id']}, "
                  f"Durum: {saga['state']}")
            
            if saga.get("retry_count", 0) < 3:
                # Yeniden dene
                self.orchestrator.retry_current_step(saga["id"])
                self.store.increment_retry(saga["id"])
            else:
                # Maksimum deneme aşıldı, compensate et
                self.orchestrator.compensate(saga["id"])
    
    def start(self):
        schedule.every(5).minutes.do(self.check_stuck_sagas)
        while True:
            schedule.run_pending()
            time.sleep(30)

Gözlemlenebilirlik: Saga’yı İzlemek

Saga akışlarını izlemek için distributed tracing kritik öneme sahiptir. Tüm servisler arasında correlation ID’yi taşıdığınızdan emin olun:

# tracing_middleware.py
from opentelemetry import trace
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

class SagaTracingMiddleware:
    def __init__(self, service_name):
        self.tracer = trace.get_tracer(service_name)
        self.propagator = TraceContextTextMapPropagator()
    
    def process_message(self, message_headers, handler_func, *args):
        # Parent context'i mesaj header'larından al
        context = self.propagator.extract(message_headers)
        
        with self.tracer.start_as_current_span(
            "saga.step.process",
            context=context,
            attributes={
                "saga.id": message_headers.get("saga_id"),
                "saga.step": message_headers.get("saga_step"),
                "messaging.system": "rabbitmq"
            }
        ) as span:
            try:
                result = handler_func(*args)
                span.set_status(trace.Status(trace.StatusCode.OK))
                return result
            except Exception as e:
                span.record_exception(e)
                span.set_status(trace.Status(trace.StatusCode.ERROR))
                raise

Choreography mu, Orchestration mu?

Bu sorunun cevabı projenin ihtiyaçlarına bağlı, ama şöyle bir rehber çerçeve işe yarayabilir:

Choreography tercih edin:

  • Akış basit ve servis sayısı az (2-3 servis)
  • Servisler arasında gevşek bağlantı kritik
  • İş akışı sıklıkla değişmiyor

Orchestration tercih edin:

  • Karmaşık iş akışları ve koşullu adımlar var
  • İş akışını merkezi olarak izlemeniz gerekiyor
  • Timeout ve retry mantığı kritik
  • Birden fazla ekip farklı servisleri yönetiyor

Pratikte hybrid yaklaşım da sık görülür: Bir domain içinde choreography, domain’ler arası saga’lar için orchestration.

Yaygın Tuzaklar

Saga implementasyonlarında sıklıkla yapılan hatalar şunlardır:

  • Compensating transaction’ları test etmemek: Mutluyol senaryolarını test ederken hata senaryolarını ihmal etmek. Chaos engineering ile compensation akışlarını düzenli test edin.
  • Saga state’ini uçucu bellekte tutmak: Orkestratör çöktüğünde tüm saga durumu kaybedilir. Daima kalıcı store kullanın.
  • Idempotency’yi görmezden gelmek: At-least-once delivery ile çalışan sistemlerde idempotency olmadan veri tutarsızlıkları kaçınılmazdır.
  • Çok ince granüler saga adımları: Her küçük işlemi ayrı saga adımı yapmak gereksiz karmaşıklık yaratır. Lokal transaction sınırlarını doğru çizin.
  • Compensation mantığını değiştirmek: Üretimdeki bir saga’nın compensation mantığı değiştirilirse, devam eden saga’lar yanlış compensate edilebilir. Versiyon yönetimi şart.

Sonuç

Saga Pattern, dağıtık sistemlerde veri tutarlılığını yönetmenin en pragmatik yollarından biridir. 2PC’nin getirdiği performans ve güvenilirlik sorunlarını eventual consistency modeline taşıyarak aşar. Ancak bu rahatlığın bir bedeli vardır: complexity. İdempotency, compensating transaction tasarımı, timeout yönetimi ve gözlemlenebilirlik… Bunların hepsi dikkatli düşünülmesi gereken konular.

Sisteminize Saga entegre etmeye başlamadan önce şu soruları kendinize sorun: Compensating transaction’larım mantıksal olarak doğru mu? Her adım gerçekten idempotent mi? Saga state’im çökme senaryolarında güvende mi? Akışı end-to-end izleyebilecek misiniz?

Bu soruların cevapları evet ise, Saga Pattern mikroservis mimarinizde dağıtık transaction yönetimini çözülebilir bir problem haline getirecektir. Choreography ile basit başlayın, karmaşıklık arttıkça orchestration’a geçişi değerlendirin. Ve her zaman hata senaryolarını production öncesi kapsamlı test edin; mutluyol testleri sizi burada kurtarmaz.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir