Redis Pub/Sub Nedir ve Mesaj Kuyruğundan Farkı

Bir üretim ortamında Redis kullanmaya başladığınızda, er ya da geç şu soruyla karşılaşırsınız: “Pub/Sub mu kullansam, yoksa bir mesaj kuyruğu mu?” Bu soruyu ilk duyduğumda ben de kafam karışmıştı. İkisi de mesajlaşma ile ilgili, ikisi de Redis üzerinde çalışıyor, ama aralarındaki fark düşündüğünüzden çok daha derin. Yanlış seçim, production’da ciddi baş ağrılarına yol açabiliyor.

Bu yazıda Redis Pub/Sub’ın gerçekte ne yaptığını, klasik mesaj kuyruklarından nasıl ayrıştığını ve hangi senaryoda hangisini seçmeniz gerektiğini somut örneklerle ele alacağım.

Redis Pub/Sub Nasıl Çalışır?

Redis Pub/Sub, yayın-abone modelinin (publish-subscribe pattern) Redis üzerindeki implementasyonudur. Temel mantık şu: bir yayıncı (publisher) bir kanala mesaj gönderir, o kanala abone olan tüm dinleyiciler (subscriber) bu mesajı anlık olarak alır.

Burada kritik olan kelime anlık. Mesaj iletimi gerçek zamanlıdır ve Redis bu mesajı hiçbir yerde depolamaz. Bir abone o an bağlı değilse, mesajı sonsuza kadar kaçırır. Redis bu mesajı “bekletmez”, bir yerde saklamaz, tekrar göndermez.

Basit bir örnekle başlayalım:

# Terminal 1 - Abone ol
redis-cli SUBSCRIBE haber-kanali

# Terminal 2 - Mesaj yayınla
redis-cli PUBLISH haber-kanali "Sistem güncellemesi tamamlandı"

Terminal 1’de şunu görürsünüz:

1) "subscribe"
2) "haber-kanali"
3) (integer) 1
# Mesaj geldiğinde:
1) "message"
2) "haber-kanali"
3) "Sistem güncellemesi tamamlandı"

Gayet temiz görünüyor. Ama şimdi Terminal 1’i kapatın, Terminal 2’den birkaç mesaj gönderin, sonra Terminal 1’i tekrar açıp abone olun. O mesajları göremezsiniz. Gitti, bitti. Bu basit gerçek, Pub/Sub ile mesaj kuyruğu arasındaki temel ayrımın ta kendisidir.

Pattern-Based Subscription

Redis Pub/Sub’ın güzel özelliklerinden biri pattern tabanlı abone olmaktır. Tek tek kanal adı vermek yerine wildcard kullanabilirsiniz:

# "log-" ile başlayan tüm kanallara abone ol
redis-cli PSUBSCRIBE "log-*"

# Başka bir terminalden farklı kanallara mesaj gönder
redis-cli PUBLISH log-nginx "404 hatası tespit edildi"
redis-cli PUBLISH log-mysql "Bağlantı havuzu doldu"
redis-cli PUBLISH log-app "Deploy tamamlandı"

Bu özellik özellikle çok sayıda servisin log’larını merkezi bir yerden dinlemek istediğinizde işe yarar. Microservice mimarisinde her servis kendi kanalına yazarken, merkezi bir monitoring servisi hepsini PSUBSCRIBE ile dinleyebilir.

Klasik Mesaj Kuyruğu Ne Fark Yaratır?

Redis üzerinde mesaj kuyruğu implementasyonu için tarihsel olarak üç farklı yöntem kullanılmıştır: LIST veri yapısı, Sorted Sets ve modern Redis Streams. Her birinin kendine özgü avantajları var ama konsept olarak hepsi aynı temel garantiyi sağlar: mesaj kaybolmaz.

LIST tabanlı basit bir kuyruk örneği:

# Producer - Kuyruğa mesaj ekle
redis-cli LPUSH is-kuyrugu '{"job_id": "123", "type": "email", "to": "[email protected]"}'
redis-cli LPUSH is-kuyrugu '{"job_id": "124", "type": "sms", "to": "+905551234567"}'

# Consumer - Kuyruktan mesaj al (blocking)
redis-cli BRPOP is-kuyrugu 0

Burada mesaj, bir consumer onu alana kadar Redis’te bekler. Consumer çökmüş olsa bile mesaj kaybolmaz. Consumer yeniden başladığında kuyruktaki mesajları işlemeye devam eder.

Bunu Pub/Sub ile karşılaştırın: Pub/Sub’da consumer çöktüğünde, o süre zarfında gönderilen tüm mesajlar kaybolur. Mesaj kuyruğunda consumer ne zaman gelirse gelsin, kuyruktaki işi alır.

Redis Streams ile Modern Yaklaşım

Redis 5.0 ile gelen Streams, mesaj kuyruğu için çok daha gelişmiş bir yapı sunuyor. Consumer grupları, mesaj onaylama (ACK) mekanizması ve kalıcı depolama bir arada geliyor:

# Stream'e mesaj ekle
redis-cli XADD siparis-stream "*" musteri_id "456" urun "laptop" adet "1"

# Consumer grubu oluştur
redis-cli XGROUP CREATE siparis-stream siparis-isleyiciler $ MKSTREAM

# Consumer olarak mesaj al
redis-cli XREADGROUP GROUP siparis-isleyiciler worker-1 COUNT 1 BLOCK 0 STREAMS siparis-stream ">"

# Mesajı işledikten sonra onayla
redis-cli XACK siparis-stream siparis-isleyiciler 1234567890123-0

ACK mekanizması burada kritik. Bir worker mesajı alır ama işleyemeden çökerse, mesaj “pending” durumunda kalır ve başka bir worker tarafından tekrar alınabilir. Pub/Sub’da böyle bir güvence yoktur.

Gerçek Dünya Senaryoları

Teorik bilgi güzel ama asıl mesele hangi durumda ne kullanacağınızı bilmek. Karşılaştığım gerçek senaryolar üzerinden gidelim.

Senaryo 1: Canlı Bildirim Sistemi (Pub/Sub için ideal)

Bir e-ticaret platformu düşünün. Sipariş verildiğinde müşteriye anlık bildirim göndermek istiyorsunuz. Müşteri o an sitede değilse bildirimi kaçırır, bu kabul edilebilir. Çünkü zaten database’e kayıt düşüyorsunuz, kullanıcı bir dahaki girişinde sipariş durumunu görebilecek.

# Sipariş servisi - bildirim yayınla
redis-cli PUBLISH kullanici-bildirimleri '{"user_id": "789", "msg": "Siparişiniz alındı", "order_id": "SIP-001"}'

# Frontend WebSocket servisi - bildirimleri dinle
redis-cli SUBSCRIBE kullanici-bildirimleri

Burada Pub/Sub mantıklı çünkü: Anlık iletim önemli, mesajın kalıcı olması gerekmiyor, aynı mesajı birden fazla servis (bildirim servisi, analytics servisi, log servisi) aynı anda alabilmeli.

Senaryo 2: E-posta Gönderim Kuyruğu (Mesaj kuyruğu için zorunlu)

Aynı e-ticaret platformunda kullanıcılara toplu e-posta göndereceğinizi düşünün. 50.000 e-posta göndereceksiniz ve bu işlem dakikalar alacak. Burada Pub/Sub kullanırsanız felaket olur. E-posta gönderim worker’ınız bir an duraksamış, yeniden başlatılmış veya deploy sürecinde kısa bir süre kapalı kalmışsa, o sırada kuyruğa eklenen e-postalar sonsuza kadar kaybolur.

# E-posta kuyruğuna ekle
redis-cli XADD email-kuyrugu "*" 
    alici "[email protected]" 
    konu "Siparişiniz kargoya verildi" 
    template "kargo-bildirimi" 
    siparis_id "SIP-001"

# Worker - e-posta al ve işle
redis-cli XREADGROUP GROUP email-workers worker-1 
    COUNT 10 BLOCK 5000 
    STREAMS email-kuyrugu ">"

Mesaj kuyruğu burada zorunlu çünkü: Her e-posta mutlaka gönderilmeli, işlenmeyen mesajların tekrar denenmesi lazım, kaç e-postanın beklemede olduğunu takip etmeniz gerekiyor.

Senaryo 3: Cache Invalidation (Pub/Sub’ın parlayan anı)

Microservice mimarisinde cache invalidation klasik bir sorundur. Ürün fiyatı güncellendiğinde, tüm servislerin kendi local cache’lerini temizlemesi gerekir. Bu tam olarak Pub/Sub’ın shine ettiği alan.

# Ürün servisi fiyat güncelledi
redis-cli PUBLISH cache-invalidate '{"entity": "product", "id": "PRD-456", "action": "update"}'

# Tüm servisler aynı anda bu mesajı alır
# Sepet servisi dinliyor
# Öneri motoru dinliyor
# Arama servisi dinliyor

Eğer bu senaryoda mesaj kuyruğu kullansaydınız, her servisin kendi kuyruğunu oluşturmanız gerekirdi. Her mesaj kopyalanarak her servise iletilirdi. Pub/Sub burada hem daha basit hem de daha verimli.

Python ile Pratik Implementasyon

Teoriden çıkıp kodla görelim. Python’da redis-py kütüphanesi kullanarak her iki yaklaşımı da uygulayalım.

Önce Pub/Sub subscriber:

# redis-py kurulumu
pip install redis
import redis
import json
import threading

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def pub_sub_listener():
    pubsub = r.pubsub()
    pubsub.subscribe('sistem-olaylari')
    
    print("Sistem olayları dinleniyor...")
    
    for message in pubsub.listen():
        if message['type'] == 'message':
            data = json.loads(message['data'])
            print(f"Olay alındı: {data['event']} - {data['timestamp']}")
            # Burada mesajı işle
            # UYARI: İşleme süresi uzarsa mesaj kaybolabilir!

# Publisher tarafı
def publish_event(event_type, details):
    payload = json.dumps({
        'event': event_type,
        'details': details,
        'timestamp': '2024-01-15T10:30:00'
    })
    subscriber_count = r.publish('sistem-olaylari', payload)
    print(f"Mesaj {subscriber_count} aboneye iletildi")

# Thread'de çalıştır
t = threading.Thread(target=pub_sub_listener)
t.daemon = True
t.start()

# Test mesajı yayınla
publish_event('deploy', {'version': 'v2.1.0', 'env': 'production'})

Şimdi aynı işi Redis Streams ile mesaj kuyruğu olarak yapalım:

import redis
import json
import time

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

STREAM_NAME = 'sistem-olaylari-stream'
GROUP_NAME = 'olay-isleyiciler'
CONSUMER_NAME = 'worker-1'

# Consumer grubu oluştur (yoksa)
try:
    r.xgroup_create(STREAM_NAME, GROUP_NAME, id='0', mkstream=True)
except redis.exceptions.ResponseError as e:
    if 'BUSYGROUP' not in str(e):
        raise

def stream_consumer():
    while True:
        # Mesaj al - 5 saniye bekle
        messages = r.xreadgroup(
            GROUP_NAME, 
            CONSUMER_NAME,
            {STREAM_NAME: '>'},
            count=10,
            block=5000
        )
        
        if messages:
            for stream, msg_list in messages:
                for msg_id, data in msg_list:
                    try:
                        print(f"İşleniyor: {msg_id} - {data}")
                        # Mesajı işle
                        time.sleep(0.1)  # İş simülasyonu
                        
                        # Başarıyla işlendi, onayla
                        r.xack(STREAM_NAME, GROUP_NAME, msg_id)
                        print(f"Onaylandı: {msg_id}")
                        
                    except Exception as e:
                        print(f"Hata: {msg_id} işlenemedi - {e}")
                        # ACK göndermiyoruz, mesaj pending kalacak

def stream_producer(event_type, details):
    msg_id = r.xadd(STREAM_NAME, {
        'event': event_type,
        'details': json.dumps(details),
        'timestamp': str(time.time())
    })
    print(f"Mesaj eklendi: {msg_id}")
    return msg_id

# Mesaj üret
stream_producer('deploy', {'version': 'v2.1.0', 'env': 'production'})
stream_producer('alert', {'severity': 'high', 'message': 'CPU %95'})

# Tüketici başlat
stream_consumer()

Bu iki kod bloğunu yan yana koyduğunuzda fark net ortaya çıkıyor. Streams versiyonunda xack çağrısı var ve hata durumunda ACK gönderilmiyor. Bu, mesajın kaybolmamasını garanti ediyor.

Pending Mesajları Yönetmek

Üretim ortamında mutlaka karşılaşacağınız bir durum: worker çöktü, bir sürü pending mesaj kaldı. Bunları nasıl kurtarırsınız?

# Pending mesajları listele
redis-cli XPENDING siparis-stream siparis-isleyiciler - + 10

# Belirli bir consumer'ın pending mesajlarını listele
redis-cli XPENDING siparis-stream siparis-isleyiciler - + 10 worker-1

# 60 saniyeden uzun süredir işlenmeyen mesajları başka worker'a aktar
redis-cli XCLAIM siparis-stream siparis-isleyiciler worker-2 60000 1234567890123-0

Pub/Sub’da böyle bir kurtarma mekanizması yoktur. Mesaj gitti mi, gitti.

Ne Zaman Hangisini Seçmelisiniz?

Bu soruya net bir cevap verebilmek için kendinize şu soruları sormalısınız:

Pub/Sub tercih edin eğer:

  • Mesajın kalıcı olması gerekmiyorsa, anlık iletim yeterliyse
  • Aynı mesajın birden fazla servise eş zamanlı iletilmesi gerekiyorsa
  • Bir mesajın işlenip işlenmediğini takip etmenize gerek yoksa
  • Broadcast senaryosu söz konusuysa (cache invalidation, canlı bildirim, chat gibi)
  • Consumer sayısı dinamik olarak değişiyorsa ve her birinin kendi state’i yoksa

Mesaj kuyruğu (Streams veya LIST) tercih edin eğer:

  • Mesajın kesinlikle işlenmesi gerekiyorsa, kayıp kabul edilemezse
  • İşlenmeyen mesajların tekrar denenmesi gerekiyorsa
  • Mesaj geçmişine ihtiyaç duyabilirseniz
  • Birden fazla worker arasında yük dengeleme yapmanız gerekiyorsa
  • İşlem hızı producer ile consumer arasında uyumsuzsa (backpressure yönetimi)

Monitoring ve Operasyonel İpuçları

Production’da her iki sistemi de izlemek için bazı pratik Redis komutları:

# Pub/Sub kanallarını ve abone sayılarını listele
redis-cli PUBSUB CHANNELS "*"
redis-cli PUBSUB NUMSUB kanal-adi-1 kanal-adi-2

# Aktif pattern subscription sayısı
redis-cli PUBSUB NUMPAT

# Stream bilgilerini görüntüle
redis-cli XINFO STREAM siparis-stream

# Consumer grup detayları
redis-cli XINFO GROUPS siparis-stream

# Stream uzunluğu
redis-cli XLEN siparis-stream

# Stream'i belirli boyutta tut (eski mesajları sil)
redis-cli XTRIM siparis-stream MAXLEN 100000

Stream büyüklüğünü kontrol altında tutmak için MAXLEN parametresini XADD ile birlikte kullanabilirsiniz:

# Maksimum 100.000 mesaj tut, eskilerini otomatik sil
redis-cli XADD siparis-stream "MAXLEN" "~" "100000" "*" musteri_id "123"

~ işareti yaklaşık kırpma anlamına geliyor, Redis performans açısından tam sayıya uymak yerine buna yakın bir noktadan kesiyor. Kesin sayı istiyorsanız ~ olmadan kullanın ama bu daha maliyetli.

Sık Yapılan Hatalar

Hata 1: Pub/Sub’ı kritik işlemler için kullanmak

En çok gördüğüm hata bu. “Hızlı ve basit” diye Pub/Sub seçiliyor ama ödeme bildirimleri veya stok güncellemeleri gibi kritik mesajlar için kullanılıyor. Worker birkaç saniye duraklaması halinde mesajlar kaybolup gitmiş oluyor.

Hata 2: Streams’de ACK yapmayı unutmak

ACK mekanizması varken kullanmamak. Mesajı alıyorsunuz, işliyorsunuz ama XACK çağırmıyorsunuz. Zamanla pending listesi şişiyor, memory tüketiyor ve operasyonel kargaşa çıkıyor.

Hata 3: Stream boyutunu sınırlamamak

MAXLEN kullanmadan bırakılan stream’ler zamanla onlarca GB memory tüketebilir. Mutlaka bir retention politikası belirleyin.

Hata 4: Tek consumer ile Streams kullanmak

Eğer tek bir consumer kullanıyorsanız ve consumer group özelliklerine ihtiyaç duymuyorsanız, XREAD yeterli. Consumer group ve ACK mekanizması birden fazla worker çalıştırdığınızda anlam kazanıyor.

Sonuç

Redis Pub/Sub ve mesaj kuyruğu birbirinin alternatifi değil, farklı problemlerin çözümü. Pub/Sub “şu an kim dinliyorsa mesajı alsın” mantığıyla çalışırken, mesaj kuyruğu “mesaj mutlaka işlensin, ne zaman olursa olsun” garantisi veriyor.

Basit kural olarak şunu aklınızda tutabilirsiniz: Mesajın kaybolması durumunda sadece “o an için güncel bilgi gösterilmemiş olur” gibi küçük bir etki olacaksa Pub/Sub uygundur. Ama mesajın kaybolması durumunda para kaybı, eksik işlem veya veri tutarsızlığı oluşacaksa, Redis Streams veya başka bir kalıcı mesaj kuyruğu sistemi kullanın.

Production’da ikisini birlikte kullanmaktan çekinmeyin. Örneğin bir sipariş geldiğinde hem Streams’e kaydedip işleme koyabilir, hem de Pub/Sub üzerinden gerçek zamanlı dashboard’u güncelleyebilirsiniz. Redis her iki modeli de aynı instance üzerinde destekliyor, bu esnekliği değerlendirin.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir