Redis Pub/Sub ile Gerçek Zamanlı Mesajlaşma Sistemi Kurulumu

Gerçek zamanlı bildirimler, canlı chat sistemleri, mikro servis iletişimi… Bunların hepsinin arkasında güçlü bir mesajlaşma altyapısı yatar. Redis’in Pub/Sub özelliği, bu altyapıyı kurmak için hem hızlı hem de düşük maliyetli bir çözüm sunar. Kafka veya RabbitMQ gibi ağır topçulara gitmeden önce Redis Pub/Sub’ın neler yapabileceğine bir bakalım.

Redis Pub/Sub Nedir ve Nasıl Çalışır?

Redis Pub/Sub, Publish/Subscribe (yayınla/abone ol) mesajlaşma paradigmasını uygular. Bu modelde mesaj gönderen taraf (publisher) mesajları doğrudan alıcılara göndermez. Bunun yerine mesajları belirli kanallara (channel) yayınlar. Mesaj almak isteyen taraflar (subscriber) ise bu kanallara abone olur.

Bu yaklaşımın en güzel yanı gevşek bağlantı (loose coupling) sağlamasıdır. Publisher, kaç tane subscriber olduğunu veya kim olduklarını bilmek zorunda değildir. Subscriber ise mesajın kim tarafından gönderildiğini bilmeden sadece ilgilendiği kanallara odaklanır.

Redis’in bu işi nasıl yaptığına bakacak olursak, mesajlar bellekte tutulur ve kalıcı depolama yapılmaz. Yani bir subscriber çevrimdışıyken gönderilen mesajları kaçırır. Bu durum bazı kullanım senaryoları için dezavantaj gibi görünse de in-memory yapısı sayesinde gecikme son derece düşüktür.

Kurulum ve Temel Yapılandırma

Öncelikle Redis’in sisteminizde çalıştığından emin olalım. Aşağıdaki komutlarla Redis’i kurup başlatabilirsiniz:

# Ubuntu/Debian
sudo apt update
sudo apt install redis-server -y

# CentOS/RHEL
sudo yum install redis -y
sudo systemctl enable redis
sudo systemctl start redis

# Servis durumunu kontrol et
sudo systemctl status redis
redis-cli ping

Redis yapılandırma dosyasında Pub/Sub ile ilgili birkaç önemli parametre bulunur:

# /etc/redis/redis.conf içinde düzenlenecek parametreler
sudo nano /etc/redis/redis.conf

İlgili parametreler:

  • client-output-buffer-limit pubsub: Subscriber’ların okuyamadığı mesajların bellekte ne kadar tutulacağını belirler. Varsayılan değer genellikle yeterlidir ama yoğun trafik altında artırmanız gerekebilir
  • hz: Redis’in arka plan görevleri çalıştırma sıklığı. Varsayılan 10’dur, Pub/Sub yoğun kullanımında 100’e çıkabilirsiniz
  • maxmemory-policy: Bellek dolduğunda Redis’in nasıl davranacağını belirler. Pub/Sub için noeviction veya allkeys-lru uygun seçimlerdir
  • tcp-keepalive: Uzun süreli bağlantıların canlı kalması için önemlidir. 60 saniye iyi bir başlangıç değeridir

redis-cli ile Pub/Sub Test Etme

Teoriden önce ellerimizi kirletelim. İki terminal açıp temel Pub/Sub mekanizmasını test edelim:

# Terminal 1 - Subscriber
redis-cli
SUBSCRIBE haberler spor

# Beklenen çıktı:
# 1) "subscribe"
# 2) "haberler"
# 3) (integer) 1
# 1) "subscribe"
# 2) "spor"
# 3) (integer) 2
# Terminal 2 - Publisher
redis-cli
PUBLISH haberler "Bugün hava çok güzel"
PUBLISH spor "Fenerbahçe maçı başladı"
PUBLISH ekonomi "Dolar düştü"  # Bu mesaj kimse almayacak

# Terminal 1'de görünecek çıktı:
# 1) "message"
# 2) "haberler"
# 3) "Bugün hava çok güzel"
# 1) "message"
# 2) "spor"
# 3) "Fenerbahçe maçı başladı"

Gördüğünüz gibi “ekonomi” kanalına abone olmadığımız için o mesaj kayboldu. Bu Pub/Sub’ın fire-and-forget yapısını gösteriyor.

Pattern Tabanlı Abonelik

Redis sadece belirli kanal isimlerine değil, pattern (desen) kullanarak birden fazla kanala aynı anda abone olmanızı sağlar. Bu özellik PSUBSCRIBE komutuyla kullanılır:

# Terminal 1 - Pattern Subscriber
redis-cli
PSUBSCRIBE "log.*"
# Bu abonelik log.error, log.warning, log.info gibi tüm kanalları kapsar

PSUBSCRIBE "user.*.notifications"
# user.123.notifications, user.456.notifications gibi kanalları yakalar
# Terminal 2 - Publisher
redis-cli
PUBLISH log.error "Veritabanı bağlantısı kesildi"
PUBLISH log.warning "Disk dolmak üzere"
PUBLISH log.info "Uygulama başlatıldı"
PUBLISH user.123.notifications "Profilinize yorum yapıldı"

Pattern aboneliği, dinamik kanal isimleriyle çalışırken hayat kurtarır. Özellikle mikro servis mimarisinde her servisin kendi namespace’i olduğunda bu özelliği sık sık kullanacaksınız.

Python ile Gerçek Dünya Uygulaması

Şimdi gerçek bir senaryoya geçelim. Bir e-ticaret platformu düşünün; sipariş verildiğinde e-posta servisi, SMS servisi ve stok servisi aynı anda bilgilendirilmesi gerekiyor. Redis Pub/Sub bu iş için biçilmiş kaftan.

pip install redis
# publisher.py - Sipariş servisi
import redis
import json
import time

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

def siparis_yayinla(siparis_id, musteri_id, urunler, toplam_tutar):
    mesaj = {
        'siparis_id': siparis_id,
        'musteri_id': musteri_id,
        'urunler': urunler,
        'toplam_tutar': toplam_tutar,
        'zaman': time.time()
    }
    
    kanal = f"siparis.yeni"
    gonderilen = r.publish(kanal, json.dumps(mesaj))
    
    print(f"Mesaj gönderildi. {gonderilen} subscriber aldı.")
    return gonderilen

# Test için birkaç sipariş yayınlayalım
if __name__ == "__main__":
    siparisler = [
        ("SP001", "USR123", ["Laptop", "Mouse"], 15000),
        ("SP002", "USR456", ["Klavye"], 500),
        ("SP003", "USR789", ["Monitor", "Webcam"], 8000),
    ]
    
    for siparis in siparisler:
        siparis_yayinla(*siparis)
        time.sleep(1)
# subscriber.py - Çoklu servis dinleyicisi
import redis
import json
import threading
import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
pubsub = r.pubsub()

def email_servisi(mesaj):
    if mesaj['type'] == 'message':
        veri = json.loads(mesaj['data'])
        logging.getLogger('EmailServisi').info(
            f"E-posta gönderiliyor - Sipariş: {veri['siparis_id']}, "
            f"Müşteri: {veri['musteri_id']}"
        )
        # Gerçek e-posta gönderme kodu buraya gelir

def sms_servisi(mesaj):
    if mesaj['type'] == 'message':
        veri = json.loads(mesaj['data'])
        logging.getLogger('SMSServisi').info(
            f"SMS gönderiliyor - Sipariş: {veri['siparis_id']}, "
            f"Tutar: {veri['toplam_tutar']} TL"
        )

def stok_servisi(mesaj):
    if mesaj['type'] == 'message':
        veri = json.loads(mesaj['data'])
        urunler = veri.get('urunler', [])
        logging.getLogger('StokServisi').info(
            f"Stok güncelleniyor - Ürünler: {', '.join(urunler)}"
        )

# Her servis için ayrı kanal dinleyici oluştur
pubsub.subscribe(**{
    'siparis.yeni': email_servisi,
    'siparis.yeni': sms_servisi,   # Aynı kanala birden fazla handler
})

# Daha iyi yaklaşım: Her servis için ayrı subscriber örneği
pubsub_email = r.pubsub()
pubsub_sms = r.pubsub()
pubsub_stok = r.pubsub()

pubsub_email.subscribe(**{'siparis.yeni': email_servisi})
pubsub_sms.subscribe(**{'siparis.yeni': sms_servisi})
pubsub_stok.subscribe(**{'siparis.yeni': stok_servisi})

def dinle(ps, servis_adi):
    logging.getLogger(servis_adi).info("Dinlemeye başlandı...")
    for mesaj in ps.listen():
        pass  # Handler'lar otomatik çağrılır

# Her servisi ayrı thread'de çalıştır
thread_email = threading.Thread(target=dinle, args=(pubsub_email, 'Email'))
thread_sms = threading.Thread(target=dinle, args=(pubsub_sms, 'SMS'))
thread_stok = threading.Thread(target=dinle, args=(pubsub_stok, 'Stok'))

for t in [thread_email, thread_sms, thread_stok]:
    t.daemon = True
    t.start()

# Ana thread'i canlı tut
import time
while True:
    time.sleep(1)

Node.js ile Chat Uygulaması

Başka bir gerçek dünya senaryosu olarak gerçek zamanlı chat sistemi kuralım. Node.js ile Socket.IO’yu Redis Pub/Sub ile birleştirdiğinizde çok sunuculu ortamlarda bile tutarlı bir chat deneyimi sunabilirsiniz:

# chat_server.js
const redis = require('redis');
const http = require('http');
const socketIo = require('socket.io');

const yayinci = redis.createClient({ host: 'localhost', port: 6379 });
const abone = redis.createClient({ host: 'localhost', port: 6379 });

const server = http.createServer();
const io = socketIo(server);

// Redis bağlantılarını aç
async function baglan() {
    await yayinci.connect();
    await abone.connect();
    console.log('Redis bağlantıları kuruldu');
}

baglan();

// Redis kanalına abone ol
abone.subscribe('chat:genel', (mesaj) => {
    const veri = JSON.parse(mesaj);
    // Tüm bağlı Socket.IO istemcilerine ilet
    io.emit('yeni_mesaj', veri);
});

io.on('connection', (socket) => {
    console.log(`Yeni kullanıcı bağlandı: ${socket.id}`);
    
    socket.on('mesaj_gonder', async (veri) => {
        const mesaj = {
            kullanici: veri.kullanici,
            icerik: veri.icerik,
            zaman: new Date().toISOString(),
            socket_id: socket.id
        };
        
        // Redis'e yayınla - tüm sunucular duyacak
        await yayinci.publish('chat:genel', JSON.stringify(mesaj));
    });
    
    socket.on('oda_join', async (oda) => {
        socket.join(oda);
        // Oda bazlı abonelik
        await abone.subscribe(`chat:oda:${oda}`, (mesaj) => {
            io.to(oda).emit('oda_mesaj', JSON.parse(mesaj));
        });
    });
    
    socket.on('disconnect', () => {
        console.log(`Kullanıcı ayrıldı: ${socket.id}`);
    });
});

server.listen(3000, () => {
    console.log('Chat sunucusu 3000 portunda çalışıyor');
});

Bu yapının güzelliği şu: Yük dengeleme için birden fazla Node.js sunucusu çalıştırsanız bile hepsi aynı Redis kanalını dinleyeceği için bir sunucuya gelen mesaj diğer sunuculardaki kullanıcılara da ulaşır.

İzleme ve Yönetim Komutları

Pub/Sub sistemini yönetirken birkaç kritik komut bilmek gerekiyor:

# Aktif kanalları ve subscriber sayılarını listele
redis-cli PUBSUB CHANNELS "*"
redis-cli PUBSUB CHANNELS "log.*"

# Belirli kanaldaki subscriber sayısını öğren
redis-cli PUBSUB NUMSUB haberler spor log.error

# Pattern subscriber sayısını öğren
redis-cli PUBSUB NUMPAT

# Tüm Redis bağlantılarını ve durumlarını gör
redis-cli CLIENT LIST

# Pub/Sub istatistiklerini izle
redis-cli INFO stats | grep pubsub

# Gerçek zamanlı komut izleme (dikkatli kullanın, performansı etkiler)
redis-cli MONITOR

Bir kanal üzerinde kaç tane subscriber olduğunu bilmek, sistemin sağlıklı çalışıp çalışmadığını anlamak için önemlidir. Eğer NUMSUB komutu 0 döndürüyorsa o kanala mesaj göndermek boşa gidecek demektir.

Üretim Ortamı için Yapılandırma

Geliştirme ortamında çalışan bir Pub/Sub sistemi üretimde sorun çıkarabilir. İşte üretim için dikkat etmeniz gereken noktalar:

# /etc/redis/redis.conf - Üretim ayarları

# Pub/Sub buffer ayarları
# Format: client-output-buffer-limit <class> <hard limit> <soft limit> <soft seconds>
client-output-buffer-limit pubsub 32mb 8mb 60

# Yavaş subscriber'ların sistemi tıkamasını önler
# Soft limit: 8MB'a ulaşınca 60 saniye içinde temizlenmezse bağlantı kesilir
# Hard limit: 32MB'a ulaşınca anında kesilir

# TCP keepalive - uzun süre mesaj gelmeyen bağlantıları canlı tut
tcp-keepalive 60

# Bağlantı timeout - pasif bağlantıları temizle
timeout 300

# Maksimum bağlantı sayısı
maxclients 10000

# Loglama
loglevel notice
logfile /var/log/redis/redis-server.log

Yüksek erişilebilirlik için Redis Sentinel kurulumu:

# sentinel.conf
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 60000
sentinel parallel-syncs mymaster 1

# Sentinel başlatma
redis-sentinel /etc/redis/sentinel.conf &

# Sentinel durumunu kontrol et
redis-cli -p 26379 SENTINEL masters
redis-cli -p 26379 SENTINEL slaves mymaster

Yaygın Sorunlar ve Çözümleri

Pub/Sub kullanırken karşılaşılan tipik sorunlara ve çözümlerine bakalım.

Mesaj kaybı sorunu: Subscriber geçici olarak çevrimdışı olduğunda mesajları kaçırır. Bunu çözmek için pub/sub mesajlarını aynı anda bir Redis listesine de yazabilirsiniz:

# Mesajları hem yayınla hem de listede sakla
# publisher_guvenilir.py
import redis
import json
import time

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

def guvenilir_yayinla(kanal, mesaj_dict, gecmis_suresi=3600):
    mesaj_str = json.dumps(mesaj_dict)
    
    # Pipeline ile atomik işlem
    pipe = r.pipeline()
    
    # Hem yayınla hem de geçmişe kaydet
    pipe.publish(kanal, mesaj_str)
    pipe.lpush(f"{kanal}:gecmis", mesaj_str)
    pipe.ltrim(f"{kanal}:gecmis", 0, 999)  # Son 1000 mesajı tut
    pipe.expire(f"{kanal}:gecmis", gecmis_suresi)
    
    sonuclar = pipe.execute()
    return sonuclar[0]  # Subscriber sayısı

def gecmis_mesajlari_al(kanal, adet=100):
    mesajlar = r.lrange(f"{kanal}:gecmis", 0, adet - 1)
    return [json.loads(m) for m in mesajlar]

Bağlantı kopması sonrası yeniden bağlanma:

# reconnect_subscriber.py
import redis
import time
import logging

log = logging.getLogger(__name__)

def bagli_abone_ol(kanal, handler, max_deneme=5):
    deneme = 0
    
    while deneme < max_deneme:
        try:
            r = redis.Redis(
                host='localhost',
                port=6379,
                socket_keepalive=True,
                socket_keepalive_options={},
                health_check_interval=30
            )
            
            pubsub = r.pubsub()
            pubsub.subscribe(**{kanal: handler})
            
            log.info(f"{kanal} kanalı dinleniyor...")
            deneme = 0  # Başarılı bağlantıda sıfırla
            
            for mesaj in pubsub.listen():
                pass
                
        except redis.ConnectionError as e:
            deneme += 1
            bekleme = min(2 ** deneme, 30)  # Exponential backoff
            log.error(f"Bağlantı hatası: {e}. {bekleme}s sonra tekrar denenecek...")
            time.sleep(bekleme)
        except Exception as e:
            log.critical(f"Beklenmedik hata: {e}")
            raise

Performans Testleri

Sisteminizin ne kadar mesaj kaldırabileceğini anlamak için basit bir benchmark yapabilirsiniz:

# Redis'in kendi benchmark aracıyla test
redis-benchmark -t subscribe -c 100 -n 10000

# Özel Pub/Sub benchmark
# benchmark.py
import redis
import time
import threading

r_pub = redis.Redis(host='localhost', port=6379)
r_sub = redis.Redis(host='localhost', port=6379)

alindi = {'sayi': 0, 'baslangic': None}

def abone_thread():
    ps = r_sub.pubsub()
    ps.subscribe('benchmark')
    
    for mesaj in ps.listen():
        if mesaj['type'] == 'message':
            if alindi['baslangic'] is None:
                alindi['baslangic'] = time.time()
            alindi['sayi'] += 1
            
            if alindi['sayi'] % 1000 == 0:
                gecen = time.time() - alindi['baslangic']
                print(f"{alindi['sayi']} mesaj / {gecen:.2f}s = "
                      f"{alindi['sayi']/gecen:.0f} msg/s")

t = threading.Thread(target=abone_thread, daemon=True)
t.start()
time.sleep(0.5)

# 10000 mesaj gönder
for i in range(10000):
    r_pub.publish('benchmark', f"test mesaj {i}")

time.sleep(2)
print(f"Toplam alınan: {alindi['sayi']}")

Tipik bir Redis kurulumunda tek bir sunucuda saniyede 100.000’in üzerinde mesaj işleyebilirsiniz. Bu rakam çoğu kullanım senaryosu için fazlasıyla yeterlidir.

Redis Pub/Sub vs Diğer Çözümler

Ne zaman Redis Pub/Sub kullanmalısınız, ne zaman başka bir çözüme geçmelisiniz?

Redis Pub/Sub tercih etmeniz gereken durumlar:

  • Basit bildirim sistemleri ve cache invalidation
  • Gerçek zamanlı pano (dashboard) güncellemeleri
  • Mikro servisler arası event bildirimleri
  • Düşük gecikme kritik önem taşıyorsa
  • Zaten Redis altyapısı kullanıyorsanız

Kafka veya RabbitMQ gibi araçlara geçmeniz gereken durumlar:

  • Mesajların kaybolmaması kesinlikle gerekiyorsa (kalıcı mesaj kuyruğu)
  • Subscriber geç bağlandığında eski mesajlara erişmesi gerekiyorsa
  • Mesajların işlenip işlenmediğini takip etmeniz gerekiyorsa
  • Milyonlarca mesaj ve karmaşık routing kurallarınız varsa
  • Tüketici grupları (consumer groups) ihtiyacınız varsa

Sonuç

Redis Pub/Sub, doğru kullanım senaryosunda inanılmaz güçlü ve pratik bir araçtır. Kurulum kolaylığı, düşük gecikme süresi ve zaten çoğu sistemde mevcut olan Redis altyapısından yararlanabilme özelliği onu cazibeli kılar.

Ancak her araç gibi sınırlamaları var: mesaj kalıcılığı yok, geç bağlanan subscriber’lar mesajları kaçırır, ölçekleme sınırlı. Bu kısıtlamaları baştan kabul ederek sisteminizi buna göre tasarlamanız şart.

Pratik öneri olarak şunu söylerim: e-ticaret siparişleri, ödeme işlemleri gibi kritik iş akışları için Redis Pub/Sub’ı tek başına kullanmayın. Bunun yanına bir Redis List veya Redis Streams ekleyin. Ama bildirimler, canlı veri akışları, cache invalidation gibi “kaçırılsa da dünya batmaz” türündeki mesajlar için Redis Pub/Sub mükemmel bir seçimdir.

Gece yarısı pager’ınızı çaldırmak istemiyorsanız, üretim ortamına geçmeden önce mutlaka bağlantı kopması senaryolarını, yüksek mesaj hacmini ve subscriber yokken mesaj gönderme durumlarını test edin. Redis güvenilir bir araçtır ama onu kullanan uygulama katmanı dikkatli yazılmadığında tüm güvenilirlik su gibi gider.

Yorum yapın