Python ile Redis Pub/Sub Uygulaması Geliştirme
Mesaj kuyruklarıyla ilk ciddi işim Redis Pub/Sub üzerineydi. O zamanlar RabbitMQ gibi ağır bir çözüm kurmaya zamanımız yoktu, basit bir bildirim sistemi lazımdı ve Redis zaten altyapıda vardı. “Pub/Sub özelliği var mı?” diye baktım, vardı. Birkaç saat içinde çalışan bir prototip çıkardık. Ama asıl öğrenme o prototipin production’a girdiğinde başladı.
Bu yazıda Python ile Redis Pub/Sub’ı sıfırdan ele alacağım. Sadece “şu kodu çalıştır çalışıyor” değil, gerçek senaryolarda nelere dikkat etmen gerektiğini, hangi tuzaklara düştüğümü ve nasıl aştığımı aktaracağım.
Redis Pub/Sub Nedir, Ne Değildir?
Redis Pub/Sub, publisher/subscriber (yayıncı/abone) modelini uygulayan bir mesajlaşma mekanizmasıdır. Bir kanal üzerinde yayınlanan mesaj, o kanala abone olan tüm client’lara anlık iletilir.
Ama şunu baştan netleştirelim: Redis Pub/Sub bir mesaj kuyruğu değildir. Abone yokken yayınlanan mesajlar kaybolur. Bir subscriber bağlantıyı kaybedip tekrar bağlandığında, aradaki mesajları alamaz. Bu, sistemin tasarımında kritik bir faktördür.
Peki ne zaman kullanılır?
- Gerçek zamanlı bildirimler (kullanıcıya anlık alert göndermek gibi)
- Servisler arası event yayını (microservice mimarilerinde)
- Cache invalidation sinyalleri
- Canlı dashboard güncellemeleri
- Chat uygulamaları
Kalıcı mesaj garantisi gereken durumlarda Redis Streams veya RabbitMQ/Kafka tercih edilmeli. Ama hızlı, basit ve fire-and-forget tarzı bir iletişim için Pub/Sub son derece etkilidir.
Ortamı Hazırlama
Önce gerekli bağımlılıkları kuralım:
pip install redis==5.0.1
pip install python-dotenv
Redis sunucunuz yoksa Docker ile hızlıca ayağa kaldırabilirsiniz:
docker run -d
--name redis-pubsub
-p 6379:6379
redis:7.2-alpine
redis-server --loglevel warning
Bağlantı ayarlarını environment variable olarak tutmak iyi bir alışkanlık. Özellikle birden fazla ortam (dev/staging/prod) varsa config dosyasına gömmemek lazım:
# .env dosyası
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0
REDIS_PASSWORD=
REDIS_SOCKET_TIMEOUT=5
REDIS_SOCKET_CONNECT_TIMEOUT=5
Temel Bağlantı Yapısı
Uygulamanın her yerinde ayrı ayrı connection açmak yerine merkezi bir connection factory kullanmak, connection leak’leri önler ve bağlantı ayarlarını tek yerden yönetmenizi sağlar:
import redis
import os
from dotenv import load_dotenv
load_dotenv()
class RedisConnection:
_pool = None
@classmethod
def get_pool(cls):
if cls._pool is None:
cls._pool = redis.ConnectionPool(
host=os.getenv('REDIS_HOST', 'localhost'),
port=int(os.getenv('REDIS_PORT', 6379)),
db=int(os.getenv('REDIS_DB', 0)),
password=os.getenv('REDIS_PASSWORD') or None,
socket_timeout=int(os.getenv('REDIS_SOCKET_TIMEOUT', 5)),
socket_connect_timeout=int(os.getenv('REDIS_SOCKET_CONNECT_TIMEOUT', 5)),
max_connections=20,
decode_responses=True
)
return cls._pool
@classmethod
def get_client(cls):
return redis.Redis(connection_pool=cls.get_pool())
@classmethod
def get_pubsub_client(cls):
"""
Pub/Sub için ayrı bir connection kullanmak şart.
Connection pool'daki bir bağlantı subscribe moduna
girince başka işlemler için kullanılamaz hale gelir.
"""
return redis.Redis(
host=os.getenv('REDIS_HOST', 'localhost'),
port=int(os.getenv('REDIS_PORT', 6379)),
db=int(os.getenv('REDIS_DB', 0)),
password=os.getenv('REDIS_PASSWORD') or None,
socket_timeout=None, # Subscribe için timeout None olmalı
decode_responses=True
)
socket_timeout=None dikkat çekici bir detay. Subscriber tarafında timeout verirseniz, mesaj gelmediği sürelerde bağlantı zaman aşımına uğrar. Bu da gereksiz reconnect döngülerine neden olur.
Publisher Tarafı
Publisher son derece basittir. Bir kanal adı ve mesaj içeriği yeterli:
import json
import time
import logging
from datetime import datetime
from redis_connection import RedisConnection
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('publisher')
class EventPublisher:
def __init__(self):
self.client = RedisConnection.get_client()
def publish(self, channel: str, event_type: str, data: dict) -> int:
"""
Mesajı JSON formatında yayınlar.
Dönüş değeri: mesajı alan subscriber sayısı
"""
message = {
'event_type': event_type,
'data': data,
'timestamp': datetime.utcnow().isoformat(),
'publisher_id': os.getenv('SERVICE_NAME', 'unknown')
}
payload = json.dumps(message, ensure_ascii=False)
receiver_count = self.client.publish(channel, payload)
logger.info(
f"Kanal: {channel} | Event: {event_type} | "
f"Alıcı sayısı: {receiver_count}"
)
return receiver_count
def publish_bulk(self, channel: str, events: list) -> None:
"""
Pipeline kullanarak toplu publish - network round-trip'leri azaltır
"""
pipe = self.client.pipeline()
for event in events:
payload = json.dumps(event, ensure_ascii=False)
pipe.publish(channel, payload)
results = pipe.execute()
logger.info(f"{len(results)} mesaj yayınlandı")
# Kullanım örneği - sipariş servisi senaryosu
if __name__ == '__main__':
import os
publisher = EventPublisher()
# Sipariş oluşturuldu eventi
publisher.publish(
channel='orders',
event_type='order.created',
data={
'order_id': 'ORD-20241215-001',
'customer_id': 'USR-4521',
'total_amount': 1250.00,
'currency': 'TRY'
}
)
# Stok güncelleme sinyali
publisher.publish(
channel='inventory',
event_type='stock.low',
data={
'product_id': 'PRD-789',
'current_stock': 5,
'threshold': 10
}
)
publish() metodunun dönüş değeri olan alıcı sayısı önemli bir bilgidir. Eğer 0 dönüyorsa, o an kanalı dinleyen kimse yok demektir ve mesaj kayboldu. Bu durumu loglayıp kalıcı depolama ile tamamlamanız gerekebilir.
Subscriber Tarafı – Temel Yapı
import json
import signal
import logging
from redis_connection import RedisConnection
logger = logging.getLogger('subscriber')
class EventSubscriber:
def __init__(self, service_name: str):
self.service_name = service_name
self.client = RedisConnection.get_pubsub_client()
self.pubsub = self.client.pubsub()
self._running = False
# Graceful shutdown için sinyal yakalama
signal.signal(signal.SIGTERM, self._handle_shutdown)
signal.signal(signal.SIGINT, self._handle_shutdown)
def _handle_shutdown(self, signum, frame):
logger.info(f"{self.service_name} kapatılıyor...")
self._running = False
def subscribe(self, *channels):
self.pubsub.subscribe(*channels)
logger.info(f"Abone olundu: {channels}")
def subscribe_pattern(self, *patterns):
"""
Pattern-based subscription - wildcard ile çoklu kanal dinleme
Örnek: 'orders.*' -> orders.created, orders.updated, orders.cancelled
"""
self.pubsub.psubscribe(*patterns)
logger.info(f"Pattern aboneliği: {patterns}")
def listen(self, handlers: dict):
"""
handlers: {'event_type': callable} formatında
Örnek: {'order.created': handle_new_order}
"""
self._running = True
logger.info(f"{self.service_name} mesaj beklemeye başladı")
for raw_message in self.pubsub.listen():
if not self._running:
break
if raw_message['type'] not in ('message', 'pmessage'):
continue
try:
message = json.loads(raw_message['data'])
event_type = message.get('event_type')
handler = handlers.get(event_type) or handlers.get('*')
if handler:
handler(message)
else:
logger.warning(f"Handler bulunamadı: {event_type}")
except json.JSONDecodeError as e:
logger.error(f"Geçersiz JSON mesajı: {e}")
except Exception as e:
logger.error(f"Handler hatası: {e}", exc_info=True)
self.pubsub.close()
logger.info(f"{self.service_name} durduruldu")
Gerçek Dünya Senaryosu: E-Ticaret Bildirim Sistemi
Şimdi bunu gerçekçi bir senaryoya oturtalım. Bir e-ticaret platformunda sipariş oluşturulduğunda; bildirim servisi, stok servisi ve analitik servisi aynı anda haberdar olsun.
# notification_service.py
import smtplib
import logging
from subscriber import EventSubscriber
logger = logging.getLogger('notification-service')
def handle_order_created(message: dict):
data = message['data']
logger.info(f"Yeni sipariş bildirimi gönderiliyor: {data['order_id']}")
# Email gönderme simülasyonu
# Gerçekte burada SMTP veya SendGrid entegrasyonu olur
print(f"EMAIL: Siparişiniz alındı! #{data['order_id']} - {data['total_amount']} TRY")
def handle_order_cancelled(message: dict):
data = message['data']
logger.info(f"İptal bildirimi: {data['order_id']}")
print(f"SMS: Siparişiniz #{data['order_id']} iptal edildi")
def handle_payment_failed(message: dict):
data = message['data']
logger.warning(f"Ödeme başarısız: {data.get('order_id')}")
print(f"PUSH: Ödemeniz işlenemedi, lütfen tekrar deneyin")
if __name__ == '__main__':
subscriber = EventSubscriber('notification-service')
# Pattern ile 'orders' ve 'payments' kanallarının tamamını dinle
subscriber.subscribe_pattern('orders.*', 'payments.*')
handlers = {
'order.created': handle_order_created,
'order.cancelled': handle_order_cancelled,
'payment.failed': handle_payment_failed,
'*': lambda msg: logger.debug(f"İşlenmeyen event: {msg['event_type']}")
}
subscriber.listen(handlers)
# inventory_service.py - Stok servisi aynı kanalı dinliyor
import logging
from subscriber import EventSubscriber
logger = logging.getLogger('inventory-service')
class InventoryHandler:
def __init__(self):
# Gerçekte bu bir veritabanı bağlantısı olurdu
self.stock = {'PRD-789': 15, 'PRD-100': 3}
def handle_order_created(self, message: dict):
data = message['data']
order_id = data['order_id']
logger.info(f"Stok rezervasyonu: {order_id}")
# Stok güncelleme mantığı burada
# Eğer stok yetersizse 'inventory' kanalına yayın yapabilir
for item_id, quantity in data.get('items', {}).items():
if item_id in self.stock:
self.stock[item_id] -= quantity
logger.info(f"{item_id} stok güncellendi: {self.stock[item_id]}")
if __name__ == '__main__':
handler = InventoryHandler()
subscriber = EventSubscriber('inventory-service')
subscriber.subscribe('orders')
subscriber.listen({
'order.created': handler.handle_order_created,
})
Bağlantı Kopması ve Yeniden Bağlanma
Production’da en sık yaşanan sorun bağlantı kopmaları. Redis sunucusu restart oldu, ağ kısa süreliğine çöktü, subscriber duraksadı… Bunları yönetmek şart:
import time
import redis
import logging
from redis_connection import RedisConnection
logger = logging.getLogger('resilient-subscriber')
class ResilientSubscriber:
def __init__(self, channels: list, handlers: dict, max_retries: int = 5):
self.channels = channels
self.handlers = handlers
self.max_retries = max_retries
self.retry_delay = 1 # saniye, her denemede ikiyle çarpılır
def _create_pubsub(self):
client = RedisConnection.get_pubsub_client()
pubsub = client.pubsub()
pubsub.subscribe(*self.channels)
return pubsub
def run(self):
retry_count = 0
while retry_count < self.max_retries:
try:
pubsub = self._create_pubsub()
logger.info(f"Bağlandı, kanallar: {self.channels}")
retry_count = 0 # Başarılı bağlantıda sıfırla
self.retry_delay = 1
for raw_message in pubsub.listen():
if raw_message['type'] != 'message':
continue
import json
message = json.loads(raw_message['data'])
event_type = message.get('event_type')
handler = self.handlers.get(event_type)
if handler:
handler(message)
except redis.ConnectionError as e:
retry_count += 1
logger.warning(
f"Bağlantı hatası (deneme {retry_count}/{self.max_retries}): {e}"
)
time.sleep(self.retry_delay)
self.retry_delay = min(self.retry_delay * 2, 60) # Max 60 saniye
except redis.TimeoutError as e:
logger.warning(f"Zaman aşımı, yeniden bağlanılıyor: {e}")
time.sleep(2)
except Exception as e:
logger.error(f"Beklenmeyen hata: {e}", exc_info=True)
retry_count += 1
time.sleep(self.retry_delay)
logger.critical(f"Maksimum deneme sayısına ulaşıldı ({self.max_retries})")
raise RuntimeError("Subscriber başlatılamadı")
Exponential backoff burada kritik. Sürekli aynı hızda retry yapmak hem Redis sunucusuna gereksiz yük bindirir hem de log’ları gereksiz bildirimlerle doldurur.
Threading ile Çoklu Kanal Yönetimi
Bazı durumlarda aynı süreçten birden fazla kanal dinlemek gerekebilir. Redis’in pubsub.run_in_thread() metodu bu iş için biçilmiş kaftan:
import redis
import json
import time
import logging
from redis_connection import RedisConnection
logger = logging.getLogger('threaded-subscriber')
class ThreadedSubscriberManager:
def __init__(self):
self.client = RedisConnection.get_pubsub_client()
self.pubsub = self.client.pubsub()
self._thread = None
def register_handler(self, channel: str, handler):
"""
Her kanal için ayrı handler kaydeder.
run_in_thread() bu handler'ları otomatik çağırır.
"""
def message_handler(raw_message):
if raw_message['type'] != 'message':
return
try:
message = json.loads(raw_message['data'])
handler(message)
except Exception as e:
logger.error(f"Handler hatası [{channel}]: {e}", exc_info=True)
self.pubsub.subscribe(**{channel: message_handler})
logger.info(f"Handler kaydedildi: {channel}")
def start(self, sleep_time: float = 0.01):
"""
Arka planda thread başlatır.
sleep_time: döngü aralığı (saniye) - çok düşük yapma, CPU yer
"""
self._thread = self.pubsub.run_in_thread(
sleep_time=sleep_time,
daemon=True
)
logger.info("Subscriber thread başlatıldı")
return self._thread
def stop(self):
if self._thread:
self._thread.stop()
self.pubsub.close()
logger.info("Subscriber thread durduruldu")
# Kullanım
if __name__ == '__main__':
manager = ThreadedSubscriberManager()
manager.register_handler(
'orders',
lambda msg: print(f"Sipariş eventi: {msg['event_type']}")
)
manager.register_handler(
'inventory',
lambda msg: print(f"Stok eventi: {msg['event_type']}")
)
manager.register_handler(
'payments',
lambda msg: print(f"Ödeme eventi: {msg['event_type']}")
)
thread = manager.start()
try:
# Ana thread başka işler yapabilir
while True:
time.sleep(1)
except KeyboardInterrupt:
manager.stop()
sleep_time parametresi konusunda bir not: 0.001 gibi çok küçük değerler CPU kullanımını gereksiz yere artırır. 0.01 ile 0.1 arasında bir değer çoğu senaryo için yeterlidir.
Monitoring ve Observability
Pub/Sub sistemlerinde ne kadar mesaj geçtiğini, subscriber sayısını ve kanal durumunu izlemek operasyonel açıdan çok önemli:
import redis
import json
from redis_connection import RedisConnection
class PubSubMonitor:
def __init__(self):
self.client = RedisConnection.get_client()
def get_channel_info(self, pattern: str = '*') -> dict:
"""
Aktif kanalları ve subscriber sayılarını döner
"""
channels = self.client.pubsub_channels(pattern)
result = {}
if channels:
numsub = self.client.pubsub_numsub(*channels)
for channel, count in zip(numsub[::2], numsub[1::2]):
result[channel] = int(count)
return result
def get_pattern_subscribers(self) -> int:
"""Pattern subscriber sayısını döner"""
return self.client.pubsub_numpat()
def health_check(self) -> dict:
try:
ping = self.client.ping()
channels = self.get_channel_info()
total_subscribers = sum(channels.values())
return {
'status': 'healthy' if ping else 'unhealthy',
'active_channels': len(channels),
'total_subscribers': total_subscribers,
'channels': channels,
'pattern_subscribers': self.get_pattern_subscribers()
}
except redis.RedisError as e:
return {
'status': 'error',
'error': str(e)
}
# Basit bir monitoring endpoint - Flask ile
# from flask import Flask, jsonify
# app = Flask(__name__)
# monitor = PubSubMonitor()
#
# @app.route('/health/pubsub')
# def pubsub_health():
# return jsonify(monitor.health_check())
Bu monitoring yapısını Prometheus’a beslemek için prometheus_client kütüphanesiyle kolayca entegre edebilirsiniz. active_channels ve total_subscribers metriklerini gauge olarak tanımlayıp 30 saniyede bir güncellemek yeterli.
Dikkat Edilmesi Gereken Noktalar
Production’da öğrendiğim birkaç önemli detayı sıralayayım:
- Mesaj boyutu sınırı: Redis mesajları için resmi bir limit olmasa da büyük payload’lar (1MB+) performansı ciddi etkiler. Büyük veriler için mesajda sadece referans ID taşıyıp asıl veriyi başka bir yerden çekin.
- Blocking subscribe ve timeout:
pubsub.listen()bloklayıcıdır. Uygulamanın başka işler yapması gerekiyorsa threading veya asyncio kullanın.
- Subscribe onay mesajları: Bir kanala abone olunduğunda Redis bir
subscribetipi mesaj gönderir. Bu mesajları filtreleyin, yoksa handler’larınız gereksiz çağrılır.
- Redis cluster’da Pub/Sub: Cluster modunda bir node’a publish edilen mesaj sadece o node’daki subscriber’lara ulaşır. Tüm node’lara ulaşmasını istiyorsanız
PUBLISHkomutunu broadcast modunda veyaredis-py-clusterkütüphanesiyle kullanmalısınız.
- Connection sayısı: Her subscriber ayrı bir bağlantı tüketir. 10 farklı servisten 50’şer subscriber açarsanız 500 bağlantı eder.
maxclientsayarını ve connection havuzlarınızı buna göre boyutlandırın.
- Mesaj sıralaması: Aynı kanaldaki mesajlar gönderilme sırasıyla alınır, ama farklı kanallar arasında sıralama garantisi yoktur.
Sonuç
Redis Pub/Sub, doğru senaryoda kullanıldığında son derece güçlü ve düşük overhead’li bir çözümdür. Python ekosistemi ile entegrasyonu da redis-py sayesinde oldukça pürüzsüz.
Özetlemek gerekirse: pub/sub sisteminizi tasarlarken önce “mesaj kaybolursa ne olur?” sorusunu yanıtlayın. Cevabınız “sorun olmaz, bildirim niteliğinde” ise Pub/Sub tam size göre. Cevabınız “kesinlikle işlenmeli” ise Redis Streams veya daha güçlü bir broker düşünün.
Retry mekanizmasını, graceful shutdown’ı ve monitoring’i baştan tasarlayın. Bunları sonradan eklemeye çalışmak production’da ciddi vakit kaybına neden olur. Ve son olarak: subscriber sayısını gözleyin. Birden fazla servis aynı kanala abone olduğunda publisher tarafındaki 0 alıcı durumları sizi uyarmalı, çünkü o mesajlar bir daha gelmeyecek.
