Redis Streams ile Kalıcı Mesaj Kuyruğu Oluşturma

Mesaj kuyruğu konusuna gelince çoğu ekip hâlâ “Redis Pub/Sub yeter mi?” sorusunu soruyor. Kısa cevap: hayır, yetmez. Pub/Sub’ın en büyük sorunu kalıcılık olmaması. Bir consumer bağlı değilken gelen mesajlar kaybolur, bitti. Ama Redis 5.0 ile gelen Streams yapısı bu denklemi tamamen değiştirdi. Bugün gerçek bir prodüksiyon senaryosu üzerinden Redis Streams’i derinlemesine inceleyeceğiz.

Redis Streams Neden Pub/Sub’dan Farklı

Pub/Sub’ı uzun yıllar kullandım. Basit bildirim sistemleri için hâlâ işe yarıyor, yanlış anlaşılmasın. Ama şu senaryoyu düşünün: bir e-ticaret platformunda sipariş işleme servisi çöktü, mesajlar gelmeye devam etti ve servis ayağa kalktığında hiçbir şeyi göremedi. İşte bu yüzden Kafka’ya geçiş tartışmaları başladı ekiplerimizde. Oysa Redis Streams ile aynı kalıcılık garantisini çok daha düşük operasyonel maliyetle elde edebiliyorsunuz.

Streams’in öne çıkan özellikleri:

  • Kalıcı depolama: Mesajlar Redis’in belleğinde saklanır, consumer’lar istedikleri zaman okuyabilir
  • Consumer Groups: Birden fazla consumer aynı stream’i bölerek işleyebilir, mesajlar tekrarlanmaz
  • ACK mekanizması: Consumer mesajı aldıktan sonra işlediğini onaylar, onaylanmayan mesajlar pending listesinde kalır
  • Replay yeteneği: Belirli bir ID’den itibaren geçmiş mesajları tekrar okuyabilirsiniz
  • Mesaj ID’leri: Her mesaj benzersiz bir timestamp-sequence ID alır, sıralama garantisi vardır

Temel Kavramlar ve Veri Modeli

Redis Streams’i anlamak için önce mesaj ID yapısını kavramak gerekiyor. Her mesaj şu formatta bir ID alır: millisekondzaman-sırano. Örneğin 1703001234567-0. Bu ID’nin hem sıralama hem de deduplication açısından önemi büyük.

Stream üzerine mesaj eklemek için XADD komutunu kullanırsınız:

# Basit mesaj ekleme
redis-cli XADD orders '*' event_type "order_created" order_id "12345" customer_id "987" amount "299.90"

# Çıktı örneği: "1703001234567-0"

# Stream boyutunu sınırlamak için MAXLEN
redis-cli XADD orders MAXLEN '~' 10000 '*' event_type "order_created" order_id "12346"

# Belirli ID ile ekleme (genellikle migration senaryolarında)
redis-cli XADD orders '1703001234568-0' event_type "payment_received" order_id "12345"

MAXLEN '~' kullanımındaki tilde işareti önemli, yaklaşık sınır anlamına geliyor ve Redis’in içten optimizasyonla daha verimli çalışmasını sağlıyor. Kesin 10000 yerine ~10000 demek performans açısından daha iyi.

Consumer Group Oluşturma

Production’da her zaman Consumer Group kullanın. Direkt stream okumak sadece replay senaryolarında ya da monitoring amaçlı mantıklı.

# Consumer group oluşturma
# '$' = sadece şu andan itibaren gelen mesajlar
redis-cli XGROUP CREATE orders order-processors '$' MKSTREAM

# '0' = stream'in başından itibaren tüm mesajlar
redis-cli XGROUP CREATE orders order-processors-audit '0' MKSTREAM

# MKSTREAM: Stream yoksa otomatik oluştur

# Group bilgilerini görme
redis-cli XINFO GROUPS orders

MKSTREAM parametresi çok işime yarıyor çünkü deployment sırasında consumer başlatma sırasını kafaya takmak zorunda kalmıyorsunuz. Consumer group var mı yok mu diye kontrol etmeden otomatik oluşturuyor.

Mesaj Okuma ve ACK Mekanizması

İşin kritik kısmına geldik. Consumer Group’tan mesaj okurken XREADGROUP kullanıyorsunuz:

# Consumer group'tan mesaj okuma
# '>' özel ID'si = daha önce hiç teslim edilmemiş mesajlar
redis-cli XREADGROUP GROUP order-processors consumer-1 COUNT 10 BLOCK 2000 STREAMS orders '>'

# Çıktı:
# 1) 1) "orders"
#    2) 1) 1) "1703001234567-0"
#          2) 1) "event_type"
#             2) "order_created"
#             3) "order_id"
#             4) "12345"

# Mesajı işledikten sonra ACK gönderme
redis-cli XACK orders order-processors 1703001234567-0

# Birden fazla mesajı tek seferde ACK'lama
redis-cli XACK orders order-processors 1703001234567-0 1703001234568-0 1703001234569-0

BLOCK 2000 parametresi 2 saniye bekleyip mesaj yoksa boş döner. 0 verirseniz sonsuza kadar bekler, bu da uzun polling için kullanışlı ama dikkatli olun, bağlantı timeout’larıyla sorun yaşayabilirsiniz.

Pending Mesajları Yönetme

Asıl güç burada. Bir consumer mesajı aldı ama işlerken çöktü. O mesaj pending listesine düşer ve kaybolmaz.

# Pending mesajları listele
redis-cli XPENDING orders order-processors - + 10

# Belirli consumer'ın pending mesajları
redis-cli XPENDING orders order-processors - + 10 consumer-1

# Çıktı her mesaj için şunları gösterir:
# - Mesaj ID
# - Hangi consumer'da
# - Ne kadar süredir pending (millisaniye)
# - Kaç kez teslim edildi (delivery count)

# Başka bir consumer'a pending mesajı transfer etme (XCLAIM)
# 60000ms = 1 dakikadan fazladır işlenmeyen mesajları al
redis-cli XCLAIM orders order-processors consumer-2 60000 1703001234567-0

# Otomatik claim ve okuma (Redis 6.2+)
redis-cli XAUTOCLAIM orders order-processors consumer-2 60000 0-0 COUNT 10

XAUTOCLAIM Redis 6.2 ile geldi ve hayat kurtarıcı. Önceden bu işi manuel yapmak zorundaydınız, script yazıp her X dakikada bir çalıştırıyordunuz.

Gerçek Dünya Senaryosu: Python ile Order Processing Servisi

Teoriden pratiğe geçelim. E-ticaret sistemimizde sipariş işleme servisinin nasıl göründüğüne bakalım:

import redis
import json
import time
import signal
import logging
from typing import Optional

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class OrderProcessor:
    def __init__(self, redis_url: str, stream_name: str, group_name: str, consumer_name: str):
        self.redis = redis.from_url(redis_url, decode_responses=True)
        self.stream = stream_name
        self.group = group_name
        self.consumer = consumer_name
        self.running = True
        self._ensure_group_exists()
        
        signal.signal(signal.SIGTERM, self._shutdown)
        signal.signal(signal.SIGINT, self._shutdown)
    
    def _ensure_group_exists(self):
        try:
            self.redis.xgroup_create(
                self.stream, 
                self.group, 
                id='0',
                mkstream=True
            )
            logger.info(f"Consumer group '{self.group}' oluşturuldu")
        except redis.exceptions.ResponseError as e:
            if "BUSYGROUP" in str(e):
                logger.info(f"Consumer group '{self.group}' zaten mevcut")
            else:
                raise
    
    def process_message(self, message_id: str, data: dict) -> bool:
        """
        Gerçek iş mantığı buraya gelir.
        True: başarılı, False: yeniden dene
        """
        try:
            event_type = data.get('event_type')
            order_id = data.get('order_id')
            
            logger.info(f"İşleniyor: {event_type} - Order: {order_id}")
            
            if event_type == 'order_created':
                # Stok kontrolü, ödeme işleme vb.
                time.sleep(0.1)  # Simülasyon
                logger.info(f"Sipariş {order_id} işlendi")
                return True
            elif event_type == 'payment_received':
                # Kargo başlatma, bildirim gönderme
                logger.info(f"Ödeme {order_id} için kargo başlatıldı")
                return True
            else:
                logger.warning(f"Bilinmeyen event tipi: {event_type}")
                return True  # Bilinmeyen mesajları ACK'la, döngüye girme
                
        except Exception as e:
            logger.error(f"Mesaj işleme hatası {message_id}: {e}")
            return False
    
    def claim_stale_messages(self, min_idle_ms: int = 60000):
        """60 saniyeden fazla pending kalan mesajları yakala"""
        try:
            result = self.redis.xautoclaim(
                self.stream,
                self.group,
                self.consumer,
                min_idle_ms,
                '0-0',
                count=10
            )
            messages = result[1] if result else []
            if messages:
                logger.info(f"{len(messages)} adet stale mesaj claim edildi")
            return messages
        except Exception as e:
            logger.error(f"Claim hatası: {e}")
            return []
    
    def run(self):
        logger.info(f"Consumer başladı: {self.consumer}")
        
        while self.running:
            # Önce stale mesajları kontrol et
            stale = self.claim_stale_messages()
            for msg_id, msg_data in stale:
                if self.process_message(msg_id, msg_data):
                    self.redis.xack(self.stream, self.group, msg_id)
            
            # Yeni mesajları oku
            try:
                messages = self.redis.xreadgroup(
                    self.group,
                    self.consumer,
                    {self.stream: '>'},
                    count=10,
                    block=2000
                )
                
                if not messages:
                    continue
                    
                for stream_name, stream_messages in messages:
                    for msg_id, msg_data in stream_messages:
                        if self.process_message(msg_id, msg_data):
                            self.redis.xack(self.stream, self.group, msg_id)
                        else:
                            logger.warning(f"Mesaj {msg_id} ACK'lanmadı, pending'de kalacak")
                            
            except redis.exceptions.ConnectionError as e:
                logger.error(f"Redis bağlantı hatası: {e}, yeniden bağlanıyor...")
                time.sleep(5)
    
    def _shutdown(self, signum, frame):
        logger.info("Kapatma sinyali alındı, mevcut işlemler tamamlanıyor...")
        self.running = False


if __name__ == '__main__':
    processor = OrderProcessor(
        redis_url='redis://localhost:6379',
        stream_name='orders',
        group_name='order-processors',
        consumer_name='consumer-1'
    )
    processor.run()

Producer Tarafı

Mesaj üreticisi genellikle daha basit, ama dikkat edilmesi gereken noktalar var:

import redis
import json
import uuid
from datetime import datetime

class OrderEventProducer:
    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url, decode_responses=True)
        self.stream = 'orders'
    
    def publish_order_created(self, order_id: str, customer_id: str, amount: float) -> str:
        message = {
            'event_type': 'order_created',
            'order_id': order_id,
            'customer_id': customer_id,
            'amount': str(amount),
            'timestamp': datetime.utcnow().isoformat(),
            'idempotency_key': str(uuid.uuid4())
        }
        
        msg_id = self.redis.xadd(
            self.stream,
            message,
            maxlen=50000,  # Tilde ile yaklaşık sınır
            approximate=True
        )
        
        return msg_id
    
    def publish_batch(self, events: list) -> list:
        """Pipeline ile toplu mesaj gönderimi"""
        pipe = self.redis.pipeline()
        for event in events:
            pipe.xadd(self.stream, event, maxlen=50000, approximate=True)
        return pipe.execute()


producer = OrderEventProducer('redis://localhost:6379')
msg_id = producer.publish_order_created('ORD-12345', 'CUS-987', 299.90)
print(f"Mesaj yayınlandı: {msg_id}")

Pipeline kullanımı toplu gönderimde ciddi performans farkı yaratıyor. 1000 mesajı teker teker göndermek yerine pipeline ile göndermek genellikle 5-10 kat daha hızlı.

Stream Monitoring ve Yönetim

Prodüksiyonda neyin döndüğünü bilmek zorundasınız:

# Stream genel bilgileri
redis-cli XINFO STREAM orders

# Detaylı bilgi (full flag ile son mesajları da gösterir)
redis-cli XINFO STREAM orders FULL COUNT 5

# Consumer group detayları
redis-cli XINFO GROUPS orders

# Belirli bir group'taki consumer'lar
redis-cli XINFO CONSUMERS orders order-processors

# Stream uzunluğu
redis-cli XLEN orders

# Belirli aralıktaki mesajları okuma (monitoring/debug için)
redis-cli XRANGE orders - + COUNT 10
redis-cli XRANGE orders 1703001234567-0 + COUNT 10

# Ters sıra okuma
redis-cli XREVRANGE orders + - COUNT 5

# Eski mesajları temizleme (belirli ID'den öncesini sil)
redis-cli XTRIM orders MINID 1703001234567-0

Özellikle XINFO CONSUMERS çıktısında dikkat etmeniz gereken değer pel-count, yani o consumer’ın kaç tane pending mesajı var. Bu değer sürekli artıyorsa o consumer sağlıklı değil demektir.

Dead Letter Queue Implementasyonu

Bazı mesajlar sürekli hata veriyorsa sonsuza kadar retry etmek istemezsiniz. Belirli deneme sayısından sonra dead letter queue’ya almanız gerekir:

# Dead letter stream oluşturma ve problematik mesajları oraya taşıma
# (Bu işlemi uygulama kodunuzda yaparsınız)

# Delivery count kontrolü XPENDING çıktısında geliyor
# 5'ten fazla denendiyse dead letter'a taşı

redis-cli XADD orders-dead-letter '*' 
  original_stream "orders" 
  original_id "1703001234567-0" 
  event_type "order_created" 
  order_id "12345" 
  failure_reason "payment_service_unavailable" 
  retry_count "5" 
  failed_at "2024-01-15T10:30:00Z"

# Ardından orijinal mesajı ACK'la (pending'den çıkar)
redis-cli XACK orders order-processors 1703001234567-0

Dead letter queue’yu ayrı bir consumer group ile düzenli aralıklarla tüketerek hem alert üretebilir hem de manuel müdahale gerekip gerekmediğine bakabilirsiniz.

Redis Sentinel ve Cluster ile Kullanım

Tek node Redis prodüksiyon için yeterli değil. Sentinel veya Cluster kullanıyorsanız bağlantı konfigürasyonunu doğru yapmanız şart:

from redis.sentinel import Sentinel

# Sentinel konfigürasyonu
sentinel = Sentinel(
    [
        ('sentinel-1', 26379),
        ('sentinel-2', 26379),
        ('sentinel-3', 26379)
    ],
    socket_timeout=0.5,
    password='your-redis-password'
)

# Master bağlantısı (yazma işlemleri)
master = sentinel.master_for('mymaster', socket_timeout=0.5, decode_responses=True)

# Slave bağlantısı (okuma - monitoring için kullanabilirsiniz)
slave = sentinel.slave_for('mymaster', socket_timeout=0.5, decode_responses=True)

# Artık master üzerinden stream işlemleri yapabilirsiniz
master.xadd('orders', {'event_type': 'order_created', 'order_id': '12345'})

Failover senaryolarında consumer’larınızın bağlantı hatalarını yakalayıp retry etmesi kritik. Yukarıdaki Python örneğindeki ConnectionError handling tam bu yüzden orada.

Performans Tuning

Prodüksiyonda karşılaştığım ve çözdüğüm birkaç performans sorununu paylaşayım:

  • COUNT değerini doğru seçin: Çok küçük COUNT (örneğin 1) çok fazla round-trip yaratır, çok büyük COUNT (örneğin 10000) işleme süresini uzatır ve ACK gecikmesine yol açar. Genellikle 50-200 arası iyi bir başlangıç noktası
  • BLOCK timeout’u ayarlayın: 0 (sonsuz bekleme) yerine makul bir timeout (2000-5000ms) kullanın ve loop’ta yeniden çağırın, bu şekilde stale message kontrolü ve graceful shutdown yapabilirsiniz
  • MAXLEN ile stream büyümesini kontrol edin: MAXLEN olmadan stream sonsuza kadar büyür, bellek şişer. approximate=True ile tilde kullanmayı unutmayın
  • Pipeline kullanın: Toplu XACK işlemlerinde pipeline ciddi fark yaratır, her mesaj için ayrı round-trip yerine batch ACK yapın
  • Consumer sayısını partition olarak düşünün: Consumer group’taki consumer sayısı arttıkça throughput artar, ama her consumer’ın kendi işlem kapasitesini de hesaba katın

Sonuç

Redis Streams, Kafka’nın operasyonel karmaşıklığını kaldıramayan ama Pub/Sub’ın yetersiz kaldığı o ara bölgede gerçekten parlayan bir çözüm. Özellikle şu senaryolarda kesinlikle değerlendirin:

  • Mesaj kaybı toleransınız sıfır ama tam teşekküllü Kafka cluster kurmak istemiyorsunuz
  • Zaten Redis altyapınız var ve yeni bir servis eklemekten kaçınmak istiyorsunuz
  • Consumer’larınızın bağımsız olarak scale etmesi gerekiyor
  • Geçmiş mesajları replay edebilmek istiyorsunuz

ACK mekanizması ve Consumer Groups kombinasyonu, benim gördüğüm kadarıyla pek çok ekibin Kafka’dan Redis Streams’e geçiş yapmasının temel nedeni. “At-least-once delivery” garantisiyle pending mesaj yönetimi, dead letter queue ve stale message recovery mekanizmalarını Redis’in basit operasyonel modeli üzerinde çalıştırmak gerçekten büyük bir avantaj.

Tabii her çözümün sınırları var. Multi-datacenter replikasyon, çok yüksek hacimli workload’lar (milyonlarca mesaj/saniye) veya uzun süreli mesaj retention gerektiren senaryolarda Kafka daha uygun olabilir. Ama büyük çoğunluğunun ihtiyacı için Redis Streams fazlasıyla yeterli ve production’da güvenle kullanabilirsiniz.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir