Redis Streams ile Event Sourcing Mimarisi
Bir sistemde “ne oldu” sorusunun cevabını tutmak, “şu an ne var” sorusunun cevabından çok daha değerli olduğunu anlamak biraz zaman alıyor. Event sourcing’i ilk duyduğumda akademik bir kavram gibi geldi. Ama üretim ortamında bir veri tutarsızlığını çözmek için saatlerce log karıştırdıktan sonra, olayların kendisini saklamak yerine sadece son durumu tutmanın ne kadar kör bir yaklaşım olduğunu anladım. Redis Streams tam bu noktada devreye giriyor.
Redis Streams Nedir, Neden Önemli?
Redis’in 5.0 sürümüyle gelen Streams veri yapısı, klasik Pub/Sub’dan temelden farklı. Pub/Sub’da bir mesajı kaçırırsanız, o mesaj sonsuza kadar gider. Streams’de ise her mesaj kalıcı olarak saklanır, her mesajın benzersiz bir ID’si vardır ve consumer group’lar sayesinde birden fazla tüketici aynı stream’den koordineli biçimde okuma yapabilir.
Event sourcing mimarisinin özü şudur: Uygulamanın durumunu doğrudan kaydetmek yerine, o duruma yol açan olayları sırayla kaydedersiniz. Kullanıcının bakiyesi 500 TL değil; 1000 TL yatırıldı, 300 TL çekildi, 200 TL harcandı olaylarının toplamı 500 TL’dir. Bu yaklaşım audit trail, replay kabiliyeti ve temporal query gibi özellikleri bedavaya getirir.
Redis Streams bu iki konsepti birleştirmek için son derece uygun bir zemin sunuyor. Hızlı, kalıcı (AOF/RDB ile), consumer group destekli ve özellikle mikroservis mimarilerinde event bus görevi görmesi için tasarlanmış gibi.
Temel Kavramlar
Koda girmeden önce birkaç kavramı netleştirmek gerekiyor:
- XADD: Stream’e yeni bir olay ekler
- XREAD: Stream’den okuma yapar, blocking modda bekleyebilir
- XREADGROUP: Consumer group üzerinden okuma yapar
- XACK: Bir mesajın işlendiğini onaylar
- XPENDING: Onaylanmamış mesajları listeler
- XRANGE/XREVRANGE: Belirli bir ID aralığında mesajları sorgular
- XLEN: Stream’deki toplam mesaj sayısını döner
Stream ID formatı - şeklindedir. Örneğin 1699876543210-0. Bu ID’ler zaman sıralı olduğu için temporal query’ler kurmak çok kolaylaşır.
İlk Event’i Yazmak
Basit bir e-ticaret senaryosuyla başlayalım. Sipariş olaylarını bir stream’e yazacağız:
# Sipariş oluşturuldu olayı
XADD order:events *
event_type "ORDER_CREATED"
order_id "ORD-2024-001"
user_id "USR-456"
total_amount "1250.00"
currency "TRY"
timestamp "2024-01-15T10:30:00Z"
# Ödeme alındı olayı
XADD order:events *
event_type "PAYMENT_RECEIVED"
order_id "ORD-2024-001"
payment_method "credit_card"
amount "1250.00"
transaction_id "TXN-789"
# Kargo gönderildi olayı
XADD order:events *
event_type "ORDER_SHIPPED"
order_id "ORD-2024-001"
cargo_company "yurtici"
tracking_number "YK123456789TR"
* kullandığımızda Redis otomatik ID üretir. Deterministik ID’ler için kendi ID’nizi de verebilirsiniz. Stream’in şu anki uzunluğunu kontrol edelim:
XLEN order:events
# (integer) 3
# Tüm olayları görüntüle (- ile + arasındaki her şey)
XRANGE order:events - +
Consumer Group Kurulumu
Tek bir servis okuyorsa XREAD yeterli. Ama birden fazla servis aynı olayları işleyecekse, hem her servisin tüm olayları görmesi hem de aynı olay türü için yük dağılımı gerekiyor. Consumer group’lar tam bunu çözüyor.
# Inventory servisi için consumer group oluştur
# $ işareti: "sadece bundan sonraki mesajları ver" anlamına gelir
# 0 kullanırsanız: stream'in başından itibaren tüm mesajları işler
XGROUP CREATE order:events inventory-service $ MKSTREAM
# Notification servisi için ayrı bir group
XGROUP CREATE order:events notification-service $ MKSTREAM
# Analytics servisi - baştan okusun
XGROUP CREATE order:events analytics-service 0 MKSTREAM
Şimdi her servis kendi consumer instance’larıyla okuma yapabilir:
# inventory-service'in worker-1 instance'ı
# COUNT 10: maksimum 10 mesaj al
# BLOCK 2000: mesaj yoksa 2 saniye bekle
XREADGROUP GROUP inventory-service worker-1
COUNT 10
BLOCK 2000
STREAMS order:events >
# > işareti: "henüz bu group'a assign edilmemiş yeni mesajları ver"
# 0 verirseniz: daha önce assign edilip ACK'lanmamış mesajları tekrar verir (recovery için kritik)
ACK Mekanizması ve Hata Toleransı
Bu kısım çoğu implementasyonda eksik kalıyor ve production’da can yakıyor. Worker bir mesajı aldığında, o mesaj PEL (Pending Entry List) adı verilen listede tutulur. İşlemi tamamladıktan sonra ACK göndermezseniz, Redis o mesajın hala işlenmekte olduğunu düşünür.
# Mesaj işlendi, onay gönder
XACK order:events inventory-service 1699876543210-0
# Onaylanmamış mesajları gör
XPENDING order:events inventory-service - + 10
# Belirli bir consumer'ın pending mesajları
XPENDING order:events inventory-service - + 10 worker-1
Bir worker çökerse ne olur? Diğer worker’lar XAUTOCLAIM ile sahipsiz kalan mesajları devralabilir:
# 30 saniyedir işlenmeyen mesajları worker-2'ye aktar
# MIN-IDLE-TIME milisaniye cinsinden (30000 = 30 saniye)
XAUTOCLAIM order:events inventory-service worker-2
30000 0-0 COUNT 10
Bu mekanizma sayesinde worker’lardan biri çökse bile mesajlar kaybolmuyor, başka bir worker devralıyor. Klasik Pub/Sub ile bunu yapmak neredeyse imkansız.
Python ile Gerçek Bir Event Sourcing Implementasyonu
Kavramlar yeterince netleşti, şimdi gerçek kodla bir şeyler inşa edelim. Redis Streams üzerinde basit ama production’a yakın bir event store:
import redis
import json
import uuid
from datetime import datetime
from typing import Dict, List, Optional
class EventStore:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.client = redis.from_url(redis_url, decode_responses=True)
self.stream_prefix = "events:"
def append(self, aggregate_type: str, aggregate_id: str,
event_type: str, payload: Dict) -> str:
"""
Bir olayı event store'a yazar.
Returns: Redis tarafından atanan event ID'si
"""
stream_key = f"{self.stream_prefix}{aggregate_type}"
event_data = {
"event_id": str(uuid.uuid4()),
"event_type": event_type,
"aggregate_id": aggregate_id,
"aggregate_type": aggregate_type,
"payload": json.dumps(payload),
"metadata": json.dumps({
"timestamp": datetime.utcnow().isoformat(),
"version": "1.0"
})
}
# MAXLEN ile stream boyutunu sınırla (opsiyonel)
# ~ kullanımı approximate trimming için, performans açısından daha iyi
event_id = self.client.xadd(
stream_key,
event_data,
maxlen=100000,
approximate=True
)
return event_id
def get_events(self, aggregate_type: str,
aggregate_id: Optional[str] = None,
from_id: str = "-",
to_id: str = "+") -> List[Dict]:
"""
Belirli bir aggregate için tüm olayları getirir.
aggregate_id verilirse filtreler, verilmezse tüm olayları döner.
"""
stream_key = f"{self.stream_prefix}{aggregate_type}"
raw_events = self.client.xrange(stream_key, from_id, to_id)
events = []
for event_id, fields in raw_events:
event = {
"stream_id": event_id,
"event_id": fields["event_id"],
"event_type": fields["event_type"],
"aggregate_id": fields["aggregate_id"],
"payload": json.loads(fields["payload"]),
"metadata": json.loads(fields["metadata"])
}
if aggregate_id is None or event["aggregate_id"] == aggregate_id:
events.append(event)
return events
def rebuild_state(self, aggregate_type: str, aggregate_id: str) -> Dict:
"""
Tüm olayları replay ederek güncel state'i hesaplar.
"""
events = self.get_events(aggregate_type, aggregate_id)
state = {"aggregate_id": aggregate_id, "version": 0}
for event in events:
state = self._apply_event(state, event)
state["version"] += 1
return state
def _apply_event(self, state: Dict, event: Dict) -> Dict:
"""Event type'a göre state'i günceller."""
event_type = event["event_type"]
payload = event["payload"]
if event_type == "ORDER_CREATED":
state.update({
"order_id": payload["order_id"],
"user_id": payload["user_id"],
"status": "CREATED",
"total_amount": payload["total_amount"],
"items": payload.get("items", [])
})
elif event_type == "PAYMENT_RECEIVED":
state["status"] = "PAID"
state["payment_info"] = payload
elif event_type == "ORDER_SHIPPED":
state["status"] = "SHIPPED"
state["tracking"] = payload.get("tracking_number")
elif event_type == "ORDER_DELIVERED":
state["status"] = "DELIVERED"
elif event_type == "ORDER_CANCELLED":
state["status"] = "CANCELLED"
state["cancel_reason"] = payload.get("reason")
return state
# Kullanım örneği
if __name__ == "__main__":
store = EventStore()
order_id = "ORD-2024-001"
# Olayları yaz
store.append("order", order_id, "ORDER_CREATED", {
"order_id": order_id,
"user_id": "USR-456",
"total_amount": 1250.00,
"items": [{"sku": "LAPTOP-001", "qty": 1, "price": 1250.00}]
})
store.append("order", order_id, "PAYMENT_RECEIVED", {
"transaction_id": "TXN-789",
"amount": 1250.00,
"method": "credit_card"
})
# State'i rebuild et
current_state = store.rebuild_state("order", order_id)
print(json.dumps(current_state, indent=2, ensure_ascii=False))
Snapshot Stratejisi
Her sorgu için tüm olayları replay etmek, event sayısı arttıkça kabul edilemez bir gecikmeye yol açar. Snapshot mekanizması bu sorunu çözer: belirli aralıklarla o anki state’i kaydedersiniz, sonraki sorgularda snapshot’tan başlarsınız.
# Snapshot için ayrı bir key kullan
# Her 100 eventte bir snapshot al
SET snapshot:order:ORD-2024-001 '{"status":"PAID","version":47,"last_event_id":"1699876543210-0"}'
EX 86400 # 24 saat TTL, ihtiyaca göre ayarla
# Snapshot metadata'sını ayrı tut
HSET snapshot:meta:order:ORD-2024-001
version 47
last_event_id "1699876543210-0"
created_at "2024-01-15T10:30:00Z"
Python implementasyonunda rebuild_state metodunu snapshot’ı kullanacak şekilde güncelleyelim:
def rebuild_state_with_snapshot(self, aggregate_type: str,
aggregate_id: str) -> Dict:
"""
Önce snapshot'ı kontrol eder, varsa oradan başlar.
"""
snapshot_key = f"snapshot:{aggregate_type}:{aggregate_id}"
meta_key = f"snapshot:meta:{aggregate_type}:{aggregate_id}"
snapshot_data = self.client.get(snapshot_key)
if snapshot_data:
state = json.loads(snapshot_data)
last_event_id = self.client.hget(meta_key, "last_event_id")
# Snapshot'tan sonraki olayları getir
# last_event_id'yi exclusive yapmak için özel format kullan
recent_events = self.get_events(
aggregate_type,
aggregate_id,
from_id=f"({last_event_id}", # exclusive range
to_id="+"
)
for event in recent_events:
state = self._apply_event(state, event)
state["version"] = state.get("version", 0) + 1
return state
else:
# Snapshot yoksa baştan hesapla
return self.rebuild_state(aggregate_type, aggregate_id)
Consumer Group ile Projeksiyon Oluşturma
Event sourcing’in güzel yanlarından biri, aynı olaylardan farklı projeksiyonlar oluşturabilmek. Örneğin sipariş olaylarından hem envanter güncellemesi hem de müşteri bildirimi üretebilirsiniz:
# Her servis için consumer group kur
XGROUP CREATE events:order inventory-projection 0 MKSTREAM
XGROUP CREATE events:order notification-projection 0 MKSTREAM
XGROUP CREATE events:order analytics-projection 0 MKSTREAM
# inventory-projection worker'ı çalıştır
# Sadece ORDER_CREATED ve ORDER_CANCELLED olaylarıyla ilgileniyor
while true; do
MSGS=$(redis-cli XREADGROUP GROUP inventory-projection worker-1
COUNT 50 BLOCK 1000 STREAMS events:order ">")
if [ -n "$MSGS" ]; then
# Her mesajı işle
echo "$MSGS" | process_inventory_events.sh
# Başarıyla işlenenler için ACK gönder
# (process script başarısız olursa ACK gönderme, retry mekanizması devreye girer)
fi
done
Temporal Query: Geçmişe Gitmek
Event sourcing’in en güçlü özelliklerinden biri: herhangi bir andaki state’i yeniden hesaplayabilmek. “15 Ocak saat 10:30’da bu siparişin durumu neydi?” sorusuna cevap verebilirsiniz.
# 15 Ocak 2024 10:30 UTC = 1705315800000 milisaniye (epoch)
# Bu zamana kadar olan olayları getir
XRANGE events:order - 1705315800000-9999999
# Belirli iki zaman aralığındaki olaylar
XRANGE events:order 1705315800000-0 1705316400000-9999999
# Son 100 olayı getir (debug için çok işe yarıyor)
XREVRANGE events:order + - COUNT 100
Bu özelliği gerçek hayatta kullandım: Bir muhasebe sisteminde tutarsız bakiye sorununu çözmek için, şüpheli işlemin gerçekleştiği saatteki state’i rebuild ederek hatanın tam olarak nerede oluştuğunu tespit ettik. Klasik CRUD yaklaşımda bunu yapmanın imkanı yoktu.
Monitoring ve Operasyonel Konular
Production’da Redis Streams kullanırken takip etmeniz gereken birkaç kritik metrik:
# Stream bilgilerini görüntüle
XINFO STREAM events:order
# Consumer group detayları - pending mesaj sayısına dikkat
XINFO GROUPS events:order
# Belirli bir group'un consumer'larını gör
XINFO CONSUMERS events:order inventory-projection
# Stream boyutunu kontrol et (memory yönetimi için önemli)
XLEN events:order
# Memory kullanımı
MEMORY USAGE events:order
# Sıkışmış pending mesajları bul (1 saatten uzun süredir işlenmeyen)
XPENDING events:order inventory-projection - + 100
Stream boyutu yönetimi için birkaç strateji:
- MAXLEN ile XADD: Her yazma işleminde otomatik trim, en basit yöntem
- XTRIM: Manuel olarak belirli bir boyuta trim et
- TTL tabanlı arşivleme: Eski olayları Redis’ten alıp S3 veya başka bir cold storage’a taşı
- Snapshot + trim kombinasyonu: Snapshot aldıktan sonra o noktaya kadar olan olayları sil
# Stream'i 50000 entry ile sınırla
XTRIM events:order MAXLEN ~ 50000
# Belirli bir ID'ye kadar olan olayları sil
XTRIM events:order MINID 1699876543210-0
Yaygın Hatalar ve Çözümleri
Birkaç yıllık production deneyiminden derlediğim tuzaklar:
- ACK’sız consumer: En sık görülen hata. PEL şişer, memory artar, performans düşer. Her mesajı işledikten sonra mutlaka XACK gönderin.
- Çok büyük payload: Her event’e tüm nesneyi koymak yerine sadece değişen alanları saklayın. Stream’in bir event log olduğunu unutmayın, bir object store değil.
- Snapshot almadan sürekli rebuild: Küçük projelerde sorun olmaz, ama event sayısı 10.000’i geçince ciddi gecikme yaşarsınız.
- Consumer group silip yeniden oluşturma: Group silindiğinde o group’un PEL’i de gider. Bir mesaj kayıplığının önüne geçmek için dikkatli olun.
- Blocking okuma timeout’u:
BLOCK 0sonsuza kadar bekler. Network sorunlarında connection’ı kaybedip mesaj kayıplığı yaşayabilirsiniz. Makul bir timeout değeri koyun.
Sonuç
Redis Streams, event sourcing için gerçekten olgun bir çözüm haline geldi. Consumer group mekanizması, kalıcı depolama, temporal query kabiliyeti ve yüksek throughput bir arada sunuluyor. Kafka kadar karmaşık bir kurulum gerektirmiyor, RabbitMQ kadar ephemeral değil.
Tabii her şey için doğru araç değil. Milyonlarca event’ı yıllarca saklamanız gerekiyorsa Kafka veya Event Store gibi dedicated çözümlere bakın. Ama mikroservis mimarisinde servisler arası event tabanlı iletişim ve makul boyutlu event store ihtiyacı için Redis Streams son derece pragmatik bir seçim.
Özellikle mevcut Redis altyapınız varsa ek bir bileşen eklemeden event sourcing’e geçebilirsiniz. Bu, operasyonel karmaşıklığı düşürür ve ekibin öğrenme eğrisini kısaltır. Denemek için küçük, yan kritik bir servisle başlayın, mekanizmaları içselleştirin, sonra kritik servislere taşıyın.
