Redis ile Pub/Sub Mesajlaşma Sistemi Kurulumu
Üretim ortamında çalışan bir sistemde anlık bildirimler, kullanıcıya gerçek zamanlı mesaj iletimi ya da mikroservisler arası hızlı iletişim kurman gerektiğinde Redis Pub/Sub aklına ilk gelen çözümlerden biri olmalı. Ben yıllarca RabbitMQ ve Kafka ile çalıştım, ama basit ve hızlı bir mesajlaşma katmanına ihtiyaç duyduğumda Redis’in Pub/Sub mekanizması çoğu zaman kazandı. Neden mi? Çünkü zaten Redis’i cache için kullanıyorsun, ekstra bir bileşen eklemiyorsun ve latency inanılmaz derecede düşük.
Bu yazıda Redis Pub/Sub’ı gerçekten anlamak, doğru yapılandırmak ve production’da güvenle kullanmak için ihtiyacın olan her şeyi anlatacağım.
Redis Pub/Sub Nedir ve Ne Zaman Kullanmalısın
Redis’in Pub/Sub sistemi klasik bir mesajlaşma modelidir: Publisher mesaj yayınlar, Subscriber o mesajı dinler. Aralarında herhangi bir kuyruk yok, mesajlar bellekte tutulmuyor. Bir subscriber bağlı değilse o mesajı kaçırır, bu önemli bir nokta.
Bu özellik hem güçlü yanı hem de zayıf yanı. Eğer sisteminizde şu ihtiyaçlar varsa Redis Pub/Sub biçilmiş kaftan:
- Gerçek zamanlı bildirimler (kullanıcı online oldu, sipariş durumu değişti)
- Cache invalidation sinyalleri (birden fazla uygulama sunucusu aynı cache’i kullanıyorsa)
- Mikroservisler arası event broadcasting
- Live dashboard güncellemeleri
- Chat uygulamalarında basit mesajlaşma altyapısı
Eğer mesaj kalıcılığı, retry mekanizması ya da garantili iletim istiyorsan Redis Streams veya RabbitMQ tarafına bakman gerekir. Pub/Sub bunu sağlamaz, bekleme de.
Kurulum ve Temel Yapılandırma
Önce Redis’i kuralım. Ubuntu/Debian üzerinde:
sudo apt update
sudo apt install redis-server -y
# Servis durumunu kontrol et
sudo systemctl status redis-server
# Redis'e bağlan ve test et
redis-cli ping
# PONG çıktısı almalısın
Redis konfigürasyonuna bakalım. Pub/Sub için özel olarak ayarlaman gereken birkaç parametre var:
sudo nano /etc/redis/redis.conf
Şu satırları bulup düzenle:
# Dışarıdan bağlantıya izin vermek için (production'da dikkatli ol)
bind 127.0.0.1 192.168.1.100
# Maksimum client sayısı - Pub/Sub uzun süreli bağlantılar açar
maxclients 10000
# TCP keepalive - subscriber bağlantılarının kopup kopmadığını anlaman için kritik
tcp-keepalive 300
# Pub/Sub için timeout ayarı - 0 bırakırsan bağlantı hiç kapanmaz
# Subscriber'lar için genellikle 0 mantıklı
timeout 0
# Notify keyspace events - eğer keyspace notification da kullanacaksan
# notify-keyspace-events "KEA"
Değişiklikten sonra servisi yeniden başlat:
sudo systemctl restart redis-server
sudo systemctl enable redis-server
İlk Pub/Sub Testi: Redis CLI ile
Teoriden önce ellerin kirlenmeli. İki terminal aç:
Terminal 1 – Subscriber:
redis-cli
SUBSCRIBE haberler
# Çıktı:
# 1) "subscribe"
# 2) "haberler"
# 3) (integer) 1
Terminal 2 – Publisher:
redis-cli
PUBLISH haberler "Merhaba, bu ilk mesajımız!"
# Çıktı: (integer) 1
# Bu sayı kaç subscriber'ın mesajı aldığını gösterir
Terminal 1’de şunu görmelisin:
1) "message"
2) "haberler"
3) "Merhaba, bu ilk mesajımız!"
Basit ama güçlü. Şimdi birden fazla channel ile çalışalım. PSUBSCRIBE komutu pattern matching yapmanı sağlar:
# haber ile başlayan tüm channel'ları dinle
PSUBSCRIBE haber*
# veya belirli bir pattern
PSUBSCRIBE sistem:*:alert
Bu özelliği çok kullanıyorum. Örneğin kullanici:123:bildirim, kullanici:456:bildirim gibi dinamik channel isimlerini tek bir pattern ile dinleyebiliyorsun.
Python ile Gerçek Bir Pub/Sub Uygulaması
CLI ile oynamak güzel ama gerçek hayatta Python, Node.js veya Go kullanıyorsun. Python örneğiyle devam edelim. Önce kütüphaneyi kur:
pip install redis
Şimdi bir subscriber yazalım. Bu örnek e-ticaret sisteminde sipariş olaylarını dinleyen bir servis simülasyonu:
import redis
import json
import logging
import signal
import sys
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class SiparisSubscriber:
def __init__(self, host='localhost', port=6379, db=0):
self.redis_client = redis.Redis(
host=host,
port=port,
db=db,
decode_responses=True,
socket_keepalive=True,
socket_keepalive_options={},
health_check_interval=30
)
self.pubsub = self.redis_client.pubsub()
self.running = True
# Graceful shutdown için signal handler
signal.signal(signal.SIGINT, self._shutdown)
signal.signal(signal.SIGTERM, self._shutdown)
def _shutdown(self, signum, frame):
logger.info("Kapatma sinyali alındı, bağlantılar temizleniyor...")
self.running = False
self.pubsub.unsubscribe()
self.pubsub.close()
sys.exit(0)
def siparis_olusturuldu(self, mesaj):
veri = json.loads(mesaj['data'])
logger.info(f"Yeni sipariş: #{veri['siparis_id']} - Müşteri: {veri['musteri']}")
# Buraya fatura oluşturma, email gönderme vb. eklenebilir
def siparis_iptal_edildi(self, mesaj):
veri = json.loads(mesaj['data'])
logger.warning(f"Sipariş iptal: #{veri['siparis_id']} - Sebep: {veri.get('sebep', 'Belirtilmemiş')}")
def dinle(self):
# Her channel için ayrı handler tanımla
self.pubsub.subscribe(**{
'siparis:olusturuldu': self.siparis_olusturuldu,
'siparis:iptal': self.siparis_iptal_edildi
})
logger.info("Sipariş kanalları dinleniyor...")
# run_in_thread yerine manuel loop - daha fazla kontrol
for mesaj in self.pubsub.listen():
if not self.running:
break
if mesaj['type'] == 'message':
# Handler zaten yukarıda tanımlı, otomatik çalışır
pass
if __name__ == '__main__':
subscriber = SiparisSubscriber()
subscriber.dinle()
Publisher tarafı:
import redis
import json
from datetime import datetime
class SiparisPublisher:
def __init__(self, host='localhost', port=6379, db=0):
self.redis_client = redis.Redis(
host=host,
port=port,
db=db,
decode_responses=True
)
def siparis_yayinla(self, siparis_id, musteri, toplam):
veri = {
'siparis_id': siparis_id,
'musteri': musteri,
'toplam': toplam,
'zaman': datetime.now().isoformat()
}
abone_sayisi = self.redis_client.publish(
'siparis:olusturuldu',
json.dumps(veri)
)
return abone_sayisi # Kaç subscriber aldı?
def siparis_iptal_et(self, siparis_id, sebep=None):
veri = {
'siparis_id': siparis_id,
'sebep': sebep,
'zaman': datetime.now().isoformat()
}
return self.redis_client.publish(
'siparis:iptal',
json.dumps(veri)
)
# Kullanım
publisher = SiparisPublisher()
abone_sayisi = publisher.siparis_yayinla(
siparis_id=12345,
musteri="Ahmet Yılmaz",
toplam=299.99
)
print(f"Mesaj {abone_sayisi} abone tarafından alındı")
Connection Pooling ve Production Ayarları
Tek bir bağlantıyla Pub/Sub kullanan sistemlerin sonunda sorun yaşadığına defalarca tanıklık ettim. Production’da connection pool şart:
import redis
from redis import ConnectionPool
# Pub/Sub için ayrı pool
pubsub_pool = ConnectionPool(
host='localhost',
port=6379,
db=0,
max_connections=20,
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5,
retry_on_timeout=True
)
# Normal Redis işlemleri için ayrı pool
genel_pool = ConnectionPool(
host='localhost',
port=6379,
db=0,
max_connections=50,
decode_responses=True
)
# Önemli: Pub/Sub bağlantısı bloklandığı için
# normal Redis komutları için ayrı client kullan
pubsub_client = redis.Redis(connection_pool=pubsub_pool)
genel_client = redis.Redis(connection_pool=genel_pool)
Neden ayrı pool? Pub/Sub bağlantısı listen() ile bloke olur. Eğer aynı bağlantıyı SET, GET gibi normal komutlar için kullanmaya çalışırsan redis.exceptions.ResponseError alırsın. Bu hatayı ilk gördüğümde saatler harcamıştım, seni harcatmayalım.
Redis Sentinel ile Yüksek Erişilebilirlik
Tek bir Redis instance üzerinde Pub/Sub çalıştırmak production için risk. Master-Slave + Sentinel yapısı kur:
# sentinel.conf
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel auth-pass mymaster guclu_sifre_buraya
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 60000
sentinel parallel-syncs mymaster 1
# Pub/Sub için önemli: client-reconfig-script
sentinel client-reconfig-script mymaster /usr/local/bin/redis-pubsub-reconfig.sh
Python tarafında Sentinel’e bağlanma:
from redis.sentinel import Sentinel
sentinel = Sentinel(
[
('sentinel1.sunucu.com', 26379),
('sentinel2.sunucu.com', 26379),
('sentinel3.sunucu.com', 26379)
],
socket_timeout=0.5,
password='guclu_sifre_buraya'
)
# Master'ı otomatik bul
master = sentinel.master_for('mymaster', socket_timeout=0.5)
# Pub/Sub için master bağlantısı
pubsub = master.pubsub()
pubsub.subscribe('sistem:alertler')
Dikkat: Failover sırasında subscriber bağlantıları kopar. Bunu handle etmek için reconnect logic yazman gerekiyor:
import time
import redis
from redis.sentinel import Sentinel
class DayanikliSubscriber:
def __init__(self, sentinel_hosts, master_adi, kanallar):
self.sentinel = Sentinel(sentinel_hosts, socket_timeout=0.5)
self.master_adi = master_adi
self.kanallar = kanallar
self.max_yeniden_deneme = 10
self.bekleme_suresi = 2
def baglanti_kur(self):
deneme = 0
while deneme < self.max_yeniden_deneme:
try:
master = self.sentinel.master_for(self.master_adi)
pubsub = master.pubsub()
pubsub.subscribe(*self.kanallar)
return pubsub
except redis.ConnectionError as e:
deneme += 1
print(f"Bağlantı hatası ({deneme}/{self.max_yeniden_deneme}): {e}")
time.sleep(self.bekleme_suresi * deneme)
raise Exception("Maksimum yeniden deneme sayısına ulaşıldı")
def calistir(self):
while True:
try:
pubsub = self.baglanti_kur()
print("Bağlantı kuruldu, mesajlar bekleniyor...")
for mesaj in pubsub.listen():
if mesaj['type'] == 'message':
self.mesaj_isle(mesaj)
except (redis.ConnectionError, redis.TimeoutError) as e:
print(f"Bağlantı koptu: {e}. Yeniden bağlanılıyor...")
time.sleep(self.bekleme_suresi)
def mesaj_isle(self, mesaj):
print(f"Kanal: {mesaj['channel']} | Mesaj: {mesaj['data']}")
Monitoring: Pub/Sub Durumunu İzle
Production’da kör uçmak olmaz. Redis’in Pub/Sub istatistiklerini nasıl izleyeceğini bilmen gerekiyor:
# Aktif Pub/Sub bilgilerini gör
redis-cli info pubsub
# Çıktı şuna benzer:
# pubsub_channels:5
# pubsub_patterns:2
# pubsub_shardchannels:0
# Hangi channel'lar aktif ve kaç subscriber var
redis-cli PUBSUB CHANNELS "*"
redis-cli PUBSUB NUMSUB haberler siparis:olusturuldu
redis-cli PUBSUB NUMPAT
# Gerçek zamanlı komut izleme (dikkatli kullan, production'da yük yaratır)
redis-cli monitor | grep -E "PUBLISH|SUBSCRIBE"
Prometheus ile entegrasyon için redis_exporter kullan:
# redis_exporter kur
wget https://github.com/oliver006/redis_exporter/releases/download/v1.55.0/redis_exporter-v1.55.0.linux-amd64.tar.gz
tar xzf redis_exporter-v1.55.0.linux-amd64.tar.gz
# Çalıştır
./redis_exporter --redis.addr redis://localhost:6379 --web.listen-address :9121
Grafana’da takip etmek istediğin metrikler:
- redis_pubsub_channels: Aktif channel sayısı
- redis_pubsub_patterns: Aktif pattern sayısı
- redis_commands_total{cmd=”publish”}: Saniyedeki publish sayısı
- redis_connected_clients: Toplam bağlı client
Gerçek Dünya Senaryosu: Cache Invalidation
En sık kullandığım Pub/Sub senaryosu: birden fazla uygulama sunucusu çalışırken birinin cache’i güncellemesi gerektiğinde diğerlerine haber vermek. Şöyle çalışıyor:
import redis
import json
class CacheManager:
INVALIDATION_CHANNEL = 'cache:invalidate'
def __init__(self, sunucu_id):
self.sunucu_id = sunucu_id
self.redis = redis.Redis(host='localhost', decode_responses=True)
self.yerel_cache = {}
# Arka planda invalidation mesajlarını dinle
self._invalidation_dinleyici_baslat()
def _invalidation_dinleyici_baslat(self):
import threading
thread = threading.Thread(
target=self._invalidation_dinle,
daemon=True
)
thread.start()
def _invalidation_dinle(self):
pubsub = self.redis.pubsub()
pubsub.subscribe(self.INVALIDATION_CHANNEL)
for mesaj in pubsub.listen():
if mesaj['type'] == 'message':
veri = json.loads(mesaj['data'])
# Kendi gönderdiğimiz mesajı işleme
if veri['gonderen'] == self.sunucu_id:
continue
anahtar = veri['anahtar']
if anahtar in self.yerel_cache:
del self.yerel_cache[anahtar]
print(f"Cache temizlendi: {anahtar} (Kaynak: {veri['gonderen']})")
def cache_guncelle(self, anahtar, deger):
# Yerel cache'i güncelle
self.yerel_cache[anahtar] = deger
# Redis'e yaz
self.redis.setex(anahtar, 3600, json.dumps(deger))
# Diğer sunuculara haber ver
self.redis.publish(
self.INVALIDATION_CHANNEL,
json.dumps({
'anahtar': anahtar,
'gonderen': self.sunucu_id
})
)
def cache_oku(self, anahtar):
# Önce yerel cache'e bak
if anahtar in self.yerel_cache:
return self.yerel_cache[anahtar]
# Redis'e bak
deger = self.redis.get(anahtar)
if deger:
self.yerel_cache[anahtar] = json.loads(deger)
return self.yerel_cache[anahtar]
return None
# Kullanım
manager = CacheManager(sunucu_id="web-sunucu-01")
manager.cache_guncelle("urun:5001", {"ad": "Laptop", "fiyat": 15000})
Yaygın Hatalar ve Çözümleri
Yıllar içinde insanların en çok takıldığı noktalara değineyim:
“Mesajlarım kayboldu” problemi: Subscriber offline durumdayken yayınlanan mesajlar gider. Bu Redis Pub/Sub’ın tasarımı gereği. Eğer mesaj kalıcılığı istiyorsan Pub/Sub’ı Redis Streams ile birleştir ya da direkt Streams’e geç.
Yüksek latency sorunu: pubsub.get_message() kullanıyorsan polling aralığını ayarla. Varsayılan değer çok gevşek olabilir:
# get_message ile manuel polling - daha fazla kontrol
while True:
mesaj = pubsub.get_message(timeout=0.01) # 10ms timeout
if mesaj and mesaj['type'] == 'message':
isle(mesaj)
“Too many connections” hatası:** Her Pub/Sub bağlantısı bir Redis client connection’ı tutar. maxclients limitine dikkat et ve connection pool’u doğru boyutlandır.
Subscribe edilen channel sayısı fazla: Binlerce farklı channel’a subscribe olmak yerine az sayıda channel kullan, mesaj içinde routing bilgisini taşı. Örneğin kullanici:123:bildirim yerine tek bildirimler channel’ı kullan, mesajın içine kullanici_id koy ve subscriber tarafında filtrele.
Sonuç
Redis Pub/Sub, doğru kullanım senaryosunda rakipsiz bir çözüm. Düşük latency, sıfır kurulum karmaşıklığı ve zaten altyapında olan bir araç. Ama sınırlarını bilmek şart: mesaj kalıcılığı yok, garantili iletim yok, yavaş consumer’ı beklemez.
Production’a almadan önce şu listeyi kafanda işaretle: bağlantı kopma durumunda reconnect logic yazıldı mı, connection pool doğru boyutlandırıldı mı, monitoring kuruldu mu, sentinel veya cluster yapısı mevcut mu?
Cache invalidation, gerçek zamanlı bildirimler ve mikroservis event broadcasting için Redis Pub/Sub’ı güvenle kullanabilirsin. Mesaj kuyruğu, iş dağıtımı veya garantili iletim gerektiğinde ise Redis Streams veya RabbitMQ’ya bakman gerekiyor. Doğru aracı doğru iş için kullanmak, sistem yöneticiliğinin özü bu zaten.
