Outbox Pattern ile Güvenilir Mesaj Yayınlama
Dağıtık sistemlerde “en az bir kez mesaj gönderildi” garantisi vermek, kulağa basit gelir ama pratikte sizi sabah 3’te uyandıracak türden bir problemdir. Veritabanına yazdınız, Kafka’ya veya RabbitMQ’ya mesaj gönderdiniz – ya ikisi arasında bir şey ters giderse? İşte tam bu noktada Outbox Pattern devreye girer ve bu yazıda konuyu hem teorik hem de çalışan kod örnekleriyle ele alacağız.
Sorun Ne?
Klasik senaryoyu düşünün: Bir e-ticaret sisteminde sipariş oluşturulduğunda hem veritabanına kayıt atılacak hem de “OrderCreated” eventi yayınlanacak. Çoğu geliştirici şunu yazar:
def create_order(order_data):
# DB'ye yaz
order = db.session.add(Order(**order_data))
db.session.commit()
# Kafka'ya gönder
kafka_producer.send('orders', {'event': 'OrderCreated', 'order_id': order.id})
Bu kod iki farklı şekilde patlar:
- Senaryo 1: DB commit başarılı, Kafka’ya gönderirken network koptu. Sipariş oluştu ama event gitmedi. Downstream servisler habersiz.
- Senaryo 2: Kafka’ya gönderdiniz, DB rollback oldu (başka bir hata yüzünden). Event gitti ama sipariş yok. Downstream servisler var olmayan sipariş için işlem yapmaya çalışıyor.
Distributed transaction kullansak? XA transactions teoride çözüm ama pratikte hem performans cehennemi hem de modern mesaj brokerlarının çoğu desteklemiyor. Outbox Pattern bu ikilemin zarif çözümü.
Outbox Pattern Nedir?
Temel fikir şu: Mesajı doğrudan broker’a göndermek yerine, aynı veritabanı transaction’ı içinde bir “outbox” tablosuna yazarsınız. Ayrı bir süreç (poller veya CDC) bu tabloyu okuyup broker’a iletir ve başarılı olunca kaydı işaretler.
Bu sayede veritabanı atomik transaction garantisi size business verisi ile mesajın birlikte yazıldığını ya da ikisinin de yazılmadığını garanti eder. Broker’a gönderme ise ayrı ve yeniden denenebilir bir süreçtir.
Üç temel bileşen var:
- Outbox tablosu: Gönderilecek mesajları tutan DB tablosu
- İş mantığı: Normal transaction içinde hem business kaydını hem outbox kaydını yazar
- Message relay: Outbox’ı okuyup broker’a ileten arka plan süreci
Outbox Tablosu Tasarımı
PostgreSQL için örnek bir outbox tablosu:
CREATE TABLE outbox_messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
processed_at TIMESTAMP WITH TIME ZONE,
retry_count INTEGER DEFAULT 0,
status VARCHAR(50) DEFAULT 'PENDING',
error_message TEXT
);
CREATE INDEX idx_outbox_status_created
ON outbox_messages(status, created_at)
WHERE status = 'PENDING';
CREATE INDEX idx_outbox_aggregate
ON outbox_messages(aggregate_type, aggregate_id);
Burada birkaç tasarım kararı önemli:
- aggregate_type ve aggregate_id: Hangi entity’nin eventi olduğunu takip etmenizi sağlar. Sipariş eventleri için sıralama garantisi istiyorsanız bu alanları Kafka partition key olarak kullanırsınız.
- status alanı: PENDING, PROCESSING, SENT, FAILED gibi durumlar. Partial index ile sadece PENDING kayıtları taranır, tablo büyüdükçe performans korunur.
- retry_count: Kaç kez denediğinizi bilmeden dead letter mantığı kuramazsınız.
Python ile Temel Implementasyon
SQLAlchemy kullanan bir FastAPI uygulamasında sipariş oluşturma:
from sqlalchemy.orm import Session
from sqlalchemy import text
import uuid
import json
from datetime import datetime
class OrderService:
def __init__(self, db: Session):
self.db = db
def create_order(self, customer_id: str, items: list, total_amount: float):
"""
Sipariş oluşturma ve outbox'a event yazma tek transaction'da.
"""
order_id = str(uuid.uuid4())
try:
# Business kaydı oluştur
self.db.execute(
text("""
INSERT INTO orders (id, customer_id, total_amount, status, created_at)
VALUES (:id, :customer_id, :total_amount, 'PENDING', NOW())
"""),
{
"id": order_id,
"customer_id": customer_id,
"total_amount": total_amount
}
)
# Sipariş kalemleri
for item in items:
self.db.execute(
text("""
INSERT INTO order_items (order_id, product_id, quantity, price)
VALUES (:order_id, :product_id, :quantity, :price)
"""),
{**item, "order_id": order_id}
)
# Outbox'a event yaz - AYNI TRANSACTION İÇİNDE
event_payload = {
"order_id": order_id,
"customer_id": customer_id,
"items": items,
"total_amount": total_amount,
"occurred_at": datetime.utcnow().isoformat()
}
self.db.execute(
text("""
INSERT INTO outbox_messages
(id, aggregate_type, aggregate_id, event_type, payload)
VALUES
(:id, :aggregate_type, :aggregate_id, :event_type, :payload)
"""),
{
"id": str(uuid.uuid4()),
"aggregate_type": "Order",
"aggregate_id": order_id,
"event_type": "OrderCreated",
"payload": json.dumps(event_payload)
}
)
# Tek commit - ikisi birlikte ya yazar ya yazmaz
self.db.commit()
return order_id
except Exception as e:
self.db.rollback()
raise
Bu noktada dikkat: Outbox’a yazan kod ile business kodunun aynı session/connection’ı kullandığından emin olun. Farklı connection açarsanız farklı transaction olur, pattern’in bütün anlamı kaybolur.
Message Relay: Polling Yaklaşımı
Outbox tablosunu periyodik olarak tarayıp mesajları ileten servis:
import asyncio
import logging
from confluent_kafka import Producer
import json
logger = logging.getLogger(__name__)
class OutboxPoller:
def __init__(self, db_pool, kafka_producer: Producer, poll_interval: float = 1.0):
self.db_pool = db_pool
self.kafka_producer = kafka_producer
self.poll_interval = poll_interval
self.running = False
async def start(self):
self.running = True
logger.info("Outbox poller başlatıldı")
while self.running:
try:
await self._process_batch()
except Exception as e:
logger.error(f"Batch işleme hatası: {e}")
await asyncio.sleep(self.poll_interval)
async def _process_batch(self):
async with self.db_pool.acquire() as conn:
# SKIP LOCKED: Birden fazla poller instance'ı varsa çakışmayı önler
messages = await conn.fetch("""
SELECT id, aggregate_type, aggregate_id, event_type, payload, retry_count
FROM outbox_messages
WHERE status = 'PENDING'
AND retry_count < 5
ORDER BY created_at ASC
LIMIT 100
FOR UPDATE SKIP LOCKED
""")
for message in messages:
await self._process_message(conn, message)
async def _process_message(self, conn, message):
msg_id = message['id']
# PROCESSING olarak işaretle
await conn.execute("""
UPDATE outbox_messages
SET status = 'PROCESSING'
WHERE id = $1
""", msg_id)
try:
# Kafka topic'i event type'tan türet
topic = self._resolve_topic(message['aggregate_type'], message['event_type'])
# Partition key olarak aggregate_id kullan (sıralama garantisi)
self.kafka_producer.produce(
topic=topic,
key=message['aggregate_id'].encode('utf-8'),
value=message['payload'],
headers={
'event_type': message['event_type'],
'aggregate_type': message['aggregate_type']
}
)
self.kafka_producer.flush(timeout=5)
# Başarılı olarak işaretle
await conn.execute("""
UPDATE outbox_messages
SET status = 'SENT', processed_at = NOW()
WHERE id = $1
""", msg_id)
logger.info(f"Mesaj gönderildi: {msg_id}, event: {message['event_type']}")
except Exception as e:
# Hata durumunda retry_count artır
await conn.execute("""
UPDATE outbox_messages
SET status = 'PENDING',
retry_count = retry_count + 1,
error_message = $2
WHERE id = $1
""", msg_id, str(e))
logger.warning(f"Mesaj gönderilemedi: {msg_id}, hata: {e}, deneme: {message['retry_count'] + 1}")
def _resolve_topic(self, aggregate_type: str, event_type: str) -> str:
topic_map = {
('Order', 'OrderCreated'): 'orders.created',
('Order', 'OrderCancelled'): 'orders.cancelled',
('Payment', 'PaymentProcessed'): 'payments.processed',
}
return topic_map.get((aggregate_type, event_type), f"{aggregate_type.lower()}.events")
def stop(self):
self.running = False
FOR UPDATE SKIP LOCKED burada kritik. Birden fazla poller çalışıyorsa (ki yüksek yük durumunda çalıştırırsınız) aynı satırı iki poller birden almaz. Bu PostgreSQL’e özgü güzel bir özellik.
CDC Yaklaşımı: Debezium ile
Polling yerine Change Data Capture kullanmak hem daha verimli hem de daha gerçek zamanlı. Debezium, PostgreSQL’in WAL (Write-Ahead Log) akışını okuyarak outbox tablosundaki değişiklikleri yakalar.
Docker Compose ile hızlı kurulum:
version: '3.8'
services:
postgres:
image: postgres:15
environment:
POSTGRES_DB: orderdb
POSTGRES_USER: postgres
POSTGRES_PASSWORD: secret
command:
- "postgres"
- "-c"
- "wal_level=logical"
- "-c"
- "max_replication_slots=4"
- "-c"
- "max_wal_senders=4"
ports:
- "5432:5432"
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.4.0
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- "9092:9092"
debezium:
image: debezium/connect:2.4
depends_on:
- kafka
- postgres
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: debezium-group
CONFIG_STORAGE_TOPIC: debezium_configs
OFFSET_STORAGE_TOPIC: debezium_offsets
STATUS_STORAGE_TOPIC: debezium_status
ports:
- "8083:8083"
Debezium connector konfigürasyonu:
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "secret",
"database.dbname": "orderdb",
"database.server.name": "orderdb",
"table.include.list": "public.outbox_messages",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.route.topic.replacement": "${routedByValue}.events"
}
}
Bu konfigürasyonla Debezium, outbox tablosuna her yeni kayıt eklendiğinde aggregate_type’a göre otomatik topic routing yapıyor. Order eventleri order.events topic’ine, Payment eventleri payment.events topic’ine gidiyor. Connector’ı kaydetmek için:
curl -X POST http://localhost:8083/connectors
-H "Content-Type: application/json"
-d @outbox-connector.json
# Connector durumunu kontrol et
curl http://localhost:8083/connectors/outbox-connector/status | python3 -m json.tool
# Lag durumunu izle
docker exec -it kafka kafka-consumer-groups.sh
--bootstrap-server localhost:9092
--describe
--group debezium-group
Temizlik ve Arşivleme
Outbox tablosu zamanla şişer. SENT durumundaki kayıtları ne yapacaksınız? İki seçenek var:
Silin, ama dikkatli silin. Binlerce kaydı tek seferde silmek tablo lock’una yol açar:
-- Cron job olarak çalıştır, gece 2'de
-- 7 günden eski, başarıyla gönderilmiş kayıtları batch batch sil
DO $$
DECLARE
deleted_count INTEGER;
total_deleted INTEGER := 0;
BEGIN
LOOP
DELETE FROM outbox_messages
WHERE id IN (
SELECT id FROM outbox_messages
WHERE status = 'SENT'
AND processed_at < NOW() - INTERVAL '7 days'
LIMIT 1000
FOR UPDATE SKIP LOCKED
);
GET DIAGNOSTICS deleted_count = ROW_COUNT;
total_deleted := total_deleted + deleted_count;
EXIT WHEN deleted_count = 0;
-- Veritabanını nefes aldır
PERFORM pg_sleep(0.1);
END LOOP;
RAISE NOTICE 'Toplam silinen: %', total_deleted;
END $$;
5 defadan fazla deneyip gönderilemeyenler için de ayrı bir süreç:
-- Dead letter: 5 kez denenip gönderilemeyenleri işaretle
UPDATE outbox_messages
SET status = 'DEAD_LETTER'
WHERE status = 'PENDING'
AND retry_count >= 5
AND created_at < NOW() - INTERVAL '1 hour';
İdempotent Consumer Olmak Zorunda
Outbox Pattern size “en az bir kez gönderim” garantisi verir, “tam bir kez” değil. Poller mesajı gönderdi ama DB’yi güncelleyemeden çöktü mü? Mesaj tekrar gönderilir. Dolayısıyla consumer tarafında idempotency zorunlu.
class OrderCreatedConsumer:
def __init__(self, db, processed_events_cache):
self.db = db
# Redis veya DB tablosu ile işlenmiş event ID'leri tut
self.cache = processed_events_cache
async def process(self, message):
event_id = message.headers().get('event_id')
# Daha önce işlendi mi?
if await self.cache.exists(f"processed:{event_id}"):
logger.info(f"Event zaten işlendi, atlanıyor: {event_id}")
return
payload = json.loads(message.value())
async with self.db.transaction():
# Business logic
await self._handle_order_created(payload)
# İşlenmiş olarak kaydet (transaction içinde)
await self.db.execute(
"INSERT INTO processed_events (event_id, processed_at) VALUES ($1, NOW())",
event_id
)
# Cache'e de ekle (DB'den okuma yükünü azaltmak için)
await self.cache.setex(f"processed:{event_id}", 86400, "1")
Monitoring ve Alerting
Outbox lag’i izlemezseniz bir sorun fark ettiğinizde çok geç olmuş olur:
# PENDING mesaj sayısı - bu metriği Prometheus'a gönder
psql -h localhost -U postgres -d orderdb -c "
SELECT
event_type,
COUNT(*) as pending_count,
MIN(created_at) as oldest_pending,
EXTRACT(EPOCH FROM (NOW() - MIN(created_at))) as max_lag_seconds
FROM outbox_messages
WHERE status = 'PENDING'
GROUP BY event_type
ORDER BY max_lag_seconds DESC;
"
# Dead letter oranı
psql -h localhost -U postgres -d orderdb -c "
SELECT
event_type,
COUNT(*) FILTER (WHERE status = 'DEAD_LETTER') as dead_count,
COUNT(*) as total_count,
ROUND(
COUNT(*) FILTER (WHERE status = 'DEAD_LETTER') * 100.0 / COUNT(*),
2
) as dead_letter_rate
FROM outbox_messages
WHERE created_at > NOW() - INTERVAL '24 hours'
GROUP BY event_type;
"
Alert kuralları için şu eşikleri başlangıç noktası olarak alabilirsiniz:
- 5 dakikadan eski PENDING mesaj varsa: Poller durmuş olabilir, uyarı ver
- DEAD_LETTER oranı %1’i aşarsa: Consumer veya broker tarafında yapısal bir sorun var
- PROCESSING durumunda 10 dakikadan fazla kalan kayıt: Poller yarıda kalmış, elle müdahale gerekebilir
Gerçek Dünya Notları
Birkaç ekip için bu pattern’i production’a taşırken öğrendiğim şeyleri paylaşayım.
Payload boyutu meselesi: Outbox tablosuna büyük payload’lar yazmak DB’yi şişirir. Eğer event payload’larınız düzenli olarak 100KB üzerine çıkıyorsa “Claim Check Pattern” ile birleştirin: Payload’ı S3/object storage’a yazın, outbox’a sadece referans koyun.
Sıralama garantisi: Kafka’da aynı partition key ile gönderilen mesajlar sıralı gelir. Aggregate ID’yi partition key olarak kullanırsanız aynı siparişin eventleri sıralı işlenir. Farklı siparişler arasında sıralama garantisi yoktur ve çoğu zaman gerekmez de.
Polling interval seçimi: 100ms polling her transaction için veritabanına yük demek. 5 saniye polling ise eventlerinizin 5 saniye gecikmeyle gitmesi demek. Debezium gibi CDC çözümleri bu trade-off’u ortadan kaldırır; WAL tabanlı olduğu için hem düşük latency hem de DB’ye minimal yük.
Transaction isolation: Outbox pattern’i uygularken READ COMMITTED isolation seviyesi genellikle yeterli. SERIALIZABLE kullanıyorsanız serialization hatalarına karşı retry mantığınızın olduğundan emin olun.
Sonuç
Outbox Pattern, dağıtık sistemlerde dual write probleminin pragmatik çözümü. “Ya veritabanı ya broker” ikilemine “ikisi de ya da hiçbiri” garantisi getiriyor. Pattern’in güzelliği veritabanının zaten sahip olduğu ACID garantisini yeniden kullanması.
Polling yaklaşımı küçük ve orta ölçekli sistemler için yeterli ve operasyonel olarak basit. Büyük ölçekte ve düşük latency gerektiren sistemlerde Debezium gibi CDC araçları hem daha verimli hem de daha güvenilir.
Unutmayın: Bu pattern “en az bir kez” semantiği verir. Consumer tarafında idempotency olmadan yarım kalır. İkisini birlikte uyguladığınızda elinizde gerçekten güvenilir bir mesaj altyapısı olur.
