Redis ile Pub/Sub Mesajlaşma Sistemi Kurulumu

Üretim ortamında çalışan bir sistemde anlık bildirimler, kullanıcıya gerçek zamanlı mesaj iletimi ya da mikroservisler arası hızlı iletişim kurman gerektiğinde Redis Pub/Sub aklına ilk gelen çözümlerden biri olmalı. Ben yıllarca RabbitMQ ve Kafka ile çalıştım, ama basit ve hızlı bir mesajlaşma katmanına ihtiyaç duyduğumda Redis’in Pub/Sub mekanizması çoğu zaman kazandı. Neden mi? Çünkü zaten Redis’i cache için kullanıyorsun, ekstra bir bileşen eklemiyorsun ve latency inanılmaz derecede düşük.

Bu yazıda Redis Pub/Sub’ı gerçekten anlamak, doğru yapılandırmak ve production’da güvenle kullanmak için ihtiyacın olan her şeyi anlatacağım.

Redis Pub/Sub Nedir ve Ne Zaman Kullanmalısın

Redis’in Pub/Sub sistemi klasik bir mesajlaşma modelidir: Publisher mesaj yayınlar, Subscriber o mesajı dinler. Aralarında herhangi bir kuyruk yok, mesajlar bellekte tutulmuyor. Bir subscriber bağlı değilse o mesajı kaçırır, bu önemli bir nokta.

Bu özellik hem güçlü yanı hem de zayıf yanı. Eğer sisteminizde şu ihtiyaçlar varsa Redis Pub/Sub biçilmiş kaftan:

  • Gerçek zamanlı bildirimler (kullanıcı online oldu, sipariş durumu değişti)
  • Cache invalidation sinyalleri (birden fazla uygulama sunucusu aynı cache’i kullanıyorsa)
  • Mikroservisler arası event broadcasting
  • Live dashboard güncellemeleri
  • Chat uygulamalarında basit mesajlaşma altyapısı

Eğer mesaj kalıcılığı, retry mekanizması ya da garantili iletim istiyorsan Redis Streams veya RabbitMQ tarafına bakman gerekir. Pub/Sub bunu sağlamaz, bekleme de.

Kurulum ve Temel Yapılandırma

Önce Redis’i kuralım. Ubuntu/Debian üzerinde:

sudo apt update
sudo apt install redis-server -y

# Servis durumunu kontrol et
sudo systemctl status redis-server

# Redis'e bağlan ve test et
redis-cli ping
# PONG çıktısı almalısın

Redis konfigürasyonuna bakalım. Pub/Sub için özel olarak ayarlaman gereken birkaç parametre var:

sudo nano /etc/redis/redis.conf

Şu satırları bulup düzenle:

# Dışarıdan bağlantıya izin vermek için (production'da dikkatli ol)
bind 127.0.0.1 192.168.1.100

# Maksimum client sayısı - Pub/Sub uzun süreli bağlantılar açar
maxclients 10000

# TCP keepalive - subscriber bağlantılarının kopup kopmadığını anlaman için kritik
tcp-keepalive 300

# Pub/Sub için timeout ayarı - 0 bırakırsan bağlantı hiç kapanmaz
# Subscriber'lar için genellikle 0 mantıklı
timeout 0

# Notify keyspace events - eğer keyspace notification da kullanacaksan
# notify-keyspace-events "KEA"

Değişiklikten sonra servisi yeniden başlat:

sudo systemctl restart redis-server
sudo systemctl enable redis-server

İlk Pub/Sub Testi: Redis CLI ile

Teoriden önce ellerin kirlenmeli. İki terminal aç:

Terminal 1 – Subscriber:

redis-cli
SUBSCRIBE haberler

# Çıktı:
# 1) "subscribe"
# 2) "haberler"
# 3) (integer) 1

Terminal 2 – Publisher:

redis-cli
PUBLISH haberler "Merhaba, bu ilk mesajımız!"

# Çıktı: (integer) 1
# Bu sayı kaç subscriber'ın mesajı aldığını gösterir

Terminal 1’de şunu görmelisin:

1) "message"
2) "haberler"
3) "Merhaba, bu ilk mesajımız!"

Basit ama güçlü. Şimdi birden fazla channel ile çalışalım. PSUBSCRIBE komutu pattern matching yapmanı sağlar:

# haber ile başlayan tüm channel'ları dinle
PSUBSCRIBE haber*

# veya belirli bir pattern
PSUBSCRIBE sistem:*:alert

Bu özelliği çok kullanıyorum. Örneğin kullanici:123:bildirim, kullanici:456:bildirim gibi dinamik channel isimlerini tek bir pattern ile dinleyebiliyorsun.

Python ile Gerçek Bir Pub/Sub Uygulaması

CLI ile oynamak güzel ama gerçek hayatta Python, Node.js veya Go kullanıyorsun. Python örneğiyle devam edelim. Önce kütüphaneyi kur:

pip install redis

Şimdi bir subscriber yazalım. Bu örnek e-ticaret sisteminde sipariş olaylarını dinleyen bir servis simülasyonu:

import redis
import json
import logging
import signal
import sys

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class SiparisSubscriber:
    def __init__(self, host='localhost', port=6379, db=0):
        self.redis_client = redis.Redis(
            host=host,
            port=port,
            db=db,
            decode_responses=True,
            socket_keepalive=True,
            socket_keepalive_options={},
            health_check_interval=30
        )
        self.pubsub = self.redis_client.pubsub()
        self.running = True
        
        # Graceful shutdown için signal handler
        signal.signal(signal.SIGINT, self._shutdown)
        signal.signal(signal.SIGTERM, self._shutdown)
    
    def _shutdown(self, signum, frame):
        logger.info("Kapatma sinyali alındı, bağlantılar temizleniyor...")
        self.running = False
        self.pubsub.unsubscribe()
        self.pubsub.close()
        sys.exit(0)
    
    def siparis_olusturuldu(self, mesaj):
        veri = json.loads(mesaj['data'])
        logger.info(f"Yeni sipariş: #{veri['siparis_id']} - Müşteri: {veri['musteri']}")
        # Buraya fatura oluşturma, email gönderme vb. eklenebilir
    
    def siparis_iptal_edildi(self, mesaj):
        veri = json.loads(mesaj['data'])
        logger.warning(f"Sipariş iptal: #{veri['siparis_id']} - Sebep: {veri.get('sebep', 'Belirtilmemiş')}")
    
    def dinle(self):
        # Her channel için ayrı handler tanımla
        self.pubsub.subscribe(**{
            'siparis:olusturuldu': self.siparis_olusturuldu,
            'siparis:iptal': self.siparis_iptal_edildi
        })
        
        logger.info("Sipariş kanalları dinleniyor...")
        
        # run_in_thread yerine manuel loop - daha fazla kontrol
        for mesaj in self.pubsub.listen():
            if not self.running:
                break
            if mesaj['type'] == 'message':
                # Handler zaten yukarıda tanımlı, otomatik çalışır
                pass

if __name__ == '__main__':
    subscriber = SiparisSubscriber()
    subscriber.dinle()

Publisher tarafı:

import redis
import json
from datetime import datetime

class SiparisPublisher:
    def __init__(self, host='localhost', port=6379, db=0):
        self.redis_client = redis.Redis(
            host=host,
            port=port,
            db=db,
            decode_responses=True
        )
    
    def siparis_yayinla(self, siparis_id, musteri, toplam):
        veri = {
            'siparis_id': siparis_id,
            'musteri': musteri,
            'toplam': toplam,
            'zaman': datetime.now().isoformat()
        }
        
        abone_sayisi = self.redis_client.publish(
            'siparis:olusturuldu',
            json.dumps(veri)
        )
        
        return abone_sayisi  # Kaç subscriber aldı?
    
    def siparis_iptal_et(self, siparis_id, sebep=None):
        veri = {
            'siparis_id': siparis_id,
            'sebep': sebep,
            'zaman': datetime.now().isoformat()
        }
        
        return self.redis_client.publish(
            'siparis:iptal',
            json.dumps(veri)
        )

# Kullanım
publisher = SiparisPublisher()
abone_sayisi = publisher.siparis_yayinla(
    siparis_id=12345,
    musteri="Ahmet Yılmaz",
    toplam=299.99
)
print(f"Mesaj {abone_sayisi} abone tarafından alındı")

Connection Pooling ve Production Ayarları

Tek bir bağlantıyla Pub/Sub kullanan sistemlerin sonunda sorun yaşadığına defalarca tanıklık ettim. Production’da connection pool şart:

import redis
from redis import ConnectionPool

# Pub/Sub için ayrı pool
pubsub_pool = ConnectionPool(
    host='localhost',
    port=6379,
    db=0,
    max_connections=20,
    decode_responses=True,
    socket_connect_timeout=5,
    socket_timeout=5,
    retry_on_timeout=True
)

# Normal Redis işlemleri için ayrı pool
genel_pool = ConnectionPool(
    host='localhost',
    port=6379,
    db=0,
    max_connections=50,
    decode_responses=True
)

# Önemli: Pub/Sub bağlantısı bloklandığı için
# normal Redis komutları için ayrı client kullan
pubsub_client = redis.Redis(connection_pool=pubsub_pool)
genel_client = redis.Redis(connection_pool=genel_pool)

Neden ayrı pool? Pub/Sub bağlantısı listen() ile bloke olur. Eğer aynı bağlantıyı SET, GET gibi normal komutlar için kullanmaya çalışırsan redis.exceptions.ResponseError alırsın. Bu hatayı ilk gördüğümde saatler harcamıştım, seni harcatmayalım.

Redis Sentinel ile Yüksek Erişilebilirlik

Tek bir Redis instance üzerinde Pub/Sub çalıştırmak production için risk. Master-Slave + Sentinel yapısı kur:

# sentinel.conf
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel auth-pass mymaster guclu_sifre_buraya
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 60000
sentinel parallel-syncs mymaster 1

# Pub/Sub için önemli: client-reconfig-script
sentinel client-reconfig-script mymaster /usr/local/bin/redis-pubsub-reconfig.sh

Python tarafında Sentinel’e bağlanma:

from redis.sentinel import Sentinel

sentinel = Sentinel(
    [
        ('sentinel1.sunucu.com', 26379),
        ('sentinel2.sunucu.com', 26379),
        ('sentinel3.sunucu.com', 26379)
    ],
    socket_timeout=0.5,
    password='guclu_sifre_buraya'
)

# Master'ı otomatik bul
master = sentinel.master_for('mymaster', socket_timeout=0.5)

# Pub/Sub için master bağlantısı
pubsub = master.pubsub()
pubsub.subscribe('sistem:alertler')

Dikkat: Failover sırasında subscriber bağlantıları kopar. Bunu handle etmek için reconnect logic yazman gerekiyor:

import time
import redis
from redis.sentinel import Sentinel

class DayanikliSubscriber:
    def __init__(self, sentinel_hosts, master_adi, kanallar):
        self.sentinel = Sentinel(sentinel_hosts, socket_timeout=0.5)
        self.master_adi = master_adi
        self.kanallar = kanallar
        self.max_yeniden_deneme = 10
        self.bekleme_suresi = 2
    
    def baglanti_kur(self):
        deneme = 0
        while deneme < self.max_yeniden_deneme:
            try:
                master = self.sentinel.master_for(self.master_adi)
                pubsub = master.pubsub()
                pubsub.subscribe(*self.kanallar)
                return pubsub
            except redis.ConnectionError as e:
                deneme += 1
                print(f"Bağlantı hatası ({deneme}/{self.max_yeniden_deneme}): {e}")
                time.sleep(self.bekleme_suresi * deneme)
        
        raise Exception("Maksimum yeniden deneme sayısına ulaşıldı")
    
    def calistir(self):
        while True:
            try:
                pubsub = self.baglanti_kur()
                print("Bağlantı kuruldu, mesajlar bekleniyor...")
                
                for mesaj in pubsub.listen():
                    if mesaj['type'] == 'message':
                        self.mesaj_isle(mesaj)
                        
            except (redis.ConnectionError, redis.TimeoutError) as e:
                print(f"Bağlantı koptu: {e}. Yeniden bağlanılıyor...")
                time.sleep(self.bekleme_suresi)
    
    def mesaj_isle(self, mesaj):
        print(f"Kanal: {mesaj['channel']} | Mesaj: {mesaj['data']}")

Monitoring: Pub/Sub Durumunu İzle

Production’da kör uçmak olmaz. Redis’in Pub/Sub istatistiklerini nasıl izleyeceğini bilmen gerekiyor:

# Aktif Pub/Sub bilgilerini gör
redis-cli info pubsub

# Çıktı şuna benzer:
# pubsub_channels:5
# pubsub_patterns:2
# pubsub_shardchannels:0

# Hangi channel'lar aktif ve kaç subscriber var
redis-cli PUBSUB CHANNELS "*"
redis-cli PUBSUB NUMSUB haberler siparis:olusturuldu
redis-cli PUBSUB NUMPAT

# Gerçek zamanlı komut izleme (dikkatli kullan, production'da yük yaratır)
redis-cli monitor | grep -E "PUBLISH|SUBSCRIBE"

Prometheus ile entegrasyon için redis_exporter kullan:

# redis_exporter kur
wget https://github.com/oliver006/redis_exporter/releases/download/v1.55.0/redis_exporter-v1.55.0.linux-amd64.tar.gz
tar xzf redis_exporter-v1.55.0.linux-amd64.tar.gz

# Çalıştır
./redis_exporter --redis.addr redis://localhost:6379 --web.listen-address :9121

Grafana’da takip etmek istediğin metrikler:

  • redis_pubsub_channels: Aktif channel sayısı
  • redis_pubsub_patterns: Aktif pattern sayısı
  • redis_commands_total{cmd=”publish”}: Saniyedeki publish sayısı
  • redis_connected_clients: Toplam bağlı client

Gerçek Dünya Senaryosu: Cache Invalidation

En sık kullandığım Pub/Sub senaryosu: birden fazla uygulama sunucusu çalışırken birinin cache’i güncellemesi gerektiğinde diğerlerine haber vermek. Şöyle çalışıyor:

import redis
import json

class CacheManager:
    INVALIDATION_CHANNEL = 'cache:invalidate'
    
    def __init__(self, sunucu_id):
        self.sunucu_id = sunucu_id
        self.redis = redis.Redis(host='localhost', decode_responses=True)
        self.yerel_cache = {}
        
        # Arka planda invalidation mesajlarını dinle
        self._invalidation_dinleyici_baslat()
    
    def _invalidation_dinleyici_baslat(self):
        import threading
        thread = threading.Thread(
            target=self._invalidation_dinle,
            daemon=True
        )
        thread.start()
    
    def _invalidation_dinle(self):
        pubsub = self.redis.pubsub()
        pubsub.subscribe(self.INVALIDATION_CHANNEL)
        
        for mesaj in pubsub.listen():
            if mesaj['type'] == 'message':
                veri = json.loads(mesaj['data'])
                
                # Kendi gönderdiğimiz mesajı işleme
                if veri['gonderen'] == self.sunucu_id:
                    continue
                
                anahtar = veri['anahtar']
                if anahtar in self.yerel_cache:
                    del self.yerel_cache[anahtar]
                    print(f"Cache temizlendi: {anahtar} (Kaynak: {veri['gonderen']})")
    
    def cache_guncelle(self, anahtar, deger):
        # Yerel cache'i güncelle
        self.yerel_cache[anahtar] = deger
        
        # Redis'e yaz
        self.redis.setex(anahtar, 3600, json.dumps(deger))
        
        # Diğer sunuculara haber ver
        self.redis.publish(
            self.INVALIDATION_CHANNEL,
            json.dumps({
                'anahtar': anahtar,
                'gonderen': self.sunucu_id
            })
        )
    
    def cache_oku(self, anahtar):
        # Önce yerel cache'e bak
        if anahtar in self.yerel_cache:
            return self.yerel_cache[anahtar]
        
        # Redis'e bak
        deger = self.redis.get(anahtar)
        if deger:
            self.yerel_cache[anahtar] = json.loads(deger)
            return self.yerel_cache[anahtar]
        
        return None

# Kullanım
manager = CacheManager(sunucu_id="web-sunucu-01")
manager.cache_guncelle("urun:5001", {"ad": "Laptop", "fiyat": 15000})

Yaygın Hatalar ve Çözümleri

Yıllar içinde insanların en çok takıldığı noktalara değineyim:

“Mesajlarım kayboldu” problemi: Subscriber offline durumdayken yayınlanan mesajlar gider. Bu Redis Pub/Sub’ın tasarımı gereği. Eğer mesaj kalıcılığı istiyorsan Pub/Sub’ı Redis Streams ile birleştir ya da direkt Streams’e geç.

Yüksek latency sorunu: pubsub.get_message() kullanıyorsan polling aralığını ayarla. Varsayılan değer çok gevşek olabilir:

# get_message ile manuel polling - daha fazla kontrol
while True:
    mesaj = pubsub.get_message(timeout=0.01)  # 10ms timeout
    if mesaj and mesaj['type'] == 'message':
        isle(mesaj)

“Too many connections” hatası:** Her Pub/Sub bağlantısı bir Redis client connection’ı tutar. maxclients limitine dikkat et ve connection pool’u doğru boyutlandır.

Subscribe edilen channel sayısı fazla: Binlerce farklı channel’a subscribe olmak yerine az sayıda channel kullan, mesaj içinde routing bilgisini taşı. Örneğin kullanici:123:bildirim yerine tek bildirimler channel’ı kullan, mesajın içine kullanici_id koy ve subscriber tarafında filtrele.

Sonuç

Redis Pub/Sub, doğru kullanım senaryosunda rakipsiz bir çözüm. Düşük latency, sıfır kurulum karmaşıklığı ve zaten altyapında olan bir araç. Ama sınırlarını bilmek şart: mesaj kalıcılığı yok, garantili iletim yok, yavaş consumer’ı beklemez.

Production’a almadan önce şu listeyi kafanda işaretle: bağlantı kopma durumunda reconnect logic yazıldı mı, connection pool doğru boyutlandırıldı mı, monitoring kuruldu mu, sentinel veya cluster yapısı mevcut mu?

Cache invalidation, gerçek zamanlı bildirimler ve mikroservis event broadcasting için Redis Pub/Sub’ı güvenle kullanabilirsin. Mesaj kuyruğu, iş dağıtımı veya garantili iletim gerektiğinde ise Redis Streams veya RabbitMQ’ya bakman gerekiyor. Doğru aracı doğru iş için kullanmak, sistem yöneticiliğinin özü bu zaten.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir