Redis ile Session Yönetimi ve Cache Stratejileri

Üretim ortamında Redis’i ilk kez ciddiye aldığım gün, bir e-ticaret sitesinin kampanya saatlerinde database connection pool’unun nasıl patladığını izliyordum. Her kullanıcı isteği session için ayrı bir MySQL sorgusu yapıyor, aynı anda binlerce kullanıcı sisteme giriyordu ve sonuç tahmin edilebilirdi: timeout üstüne timeout. O günden sonra Redis benim için “opsiyonel bir cache katmanı” olmaktan çıktı, mimarinin ayrılmaz bir parçası haline geldi.

Bu yazıda Redis’i sadece “hızlı bir key-value store” olarak değil, session yönetimi ve cache stratejileri açısından gerçekten nasıl kullanmanız gerektiğini anlatacağım. Pub/Sub mekanizmasını da işin içine katarak mesaj kuyruğu boyutuna da değineceğim.

Redis Session Yönetimi: Neden ve Nasıl

Klasik sticky session yaklaşımı, yani kullanıcıyı hep aynı sunucuya yönlendirmek, bir noktaya kadar işe yarıyor. Ama load balancer katmanı karmaşıklaştığında, sunucu sayısı artığında ya da Kubernetes’e geçtiğinizde bu yaklaşım sizi boğmaya başlar. Redis bu noktada merkezi bir session store olarak devreye girer.

Temel mantık şu: Her uygulama sunucusu, session verilerini kendi belleğinde tutmak yerine Redis’e yazar ve Redis’ten okur. Hangi sunucuya düşerse düşsün, kullanıcı session’ına tutarlı biçimde erişir.

Redis’i Session Store Olarak Yapılandırma

Önce Redis kurulumunu ve temel session yapısını görelim:

# Redis kurulumu (Ubuntu/Debian)
sudo apt update && sudo apt install redis-server -y

# Redis konfigürasyonu - session için önemli ayarlar
sudo vim /etc/redis/redis.conf

Session yönetimi için redis.conf içinde şu ayarlara dikkat etmeniz gerekiyor:

# Maksimum bellek limiti (örnek: 2GB)
maxmemory 2gb

# Session verisi için en uygun eviction policy
maxmemory-policy allkeys-lru

# Persistence - session verisi için AOF önerilir
appendonly yes
appendfsync everysec

# TCP keepalive
tcp-keepalive 300

# Timeout - idle bağlantıları kapat
timeout 0

Şimdi bir Python uygulamasında Redis session yönetimini nasıl implemente edeceğimize bakalım. Flask + redis-py kombinasyonunu kullanıyorum çünkü konsepti net gösteriyor:

pip install flask redis flask-session
# session_manager.py
import redis
import json
import uuid
from datetime import timedelta

class RedisSessionManager:
    def __init__(self, redis_host='localhost', redis_port=6379, 
                 session_ttl=3600):
        self.client = redis.Redis(
            host=redis_host,
            port=redis_port,
            db=0,
            decode_responses=True,
            socket_connect_timeout=5,
            socket_timeout=5,
            retry_on_timeout=True
        )
        self.session_ttl = session_ttl
        self.session_prefix = "sess:"
    
    def create_session(self, user_data: dict) -> str:
        session_id = str(uuid.uuid4())
        key = f"{self.session_prefix}{session_id}"
        
        # Session verisini JSON olarak sakla
        self.client.setex(
            name=key,
            time=timedelta(seconds=self.session_ttl),
            value=json.dumps(user_data)
        )
        return session_id
    
    def get_session(self, session_id: str) -> dict:
        key = f"{self.session_prefix}{session_id}"
        data = self.client.get(key)
        
        if data is None:
            return None
        
        # Her erişimde TTL'i yenile (sliding expiration)
        self.client.expire(key, self.session_ttl)
        return json.loads(data)
    
    def delete_session(self, session_id: str) -> bool:
        key = f"{self.session_prefix}{session_id}"
        return bool(self.client.delete(key))
    
    def update_session(self, session_id: str, updates: dict) -> bool:
        current = self.get_session(session_id)
        if current is None:
            return False
        
        current.update(updates)
        key = f"{self.session_prefix}{session_id}"
        self.client.setex(
            name=key,
            time=timedelta(seconds=self.session_ttl),
            value=json.dumps(current)
        )
        return True

Bu yapıda dikkat etmeniz gereken nokta “sliding expiration” kullanıyor olmamız. Yani kullanıcı her işlem yaptığında TTL sıfırlanıyor. Aktif bir kullanıcının session’ı hiçbir zaman düşmüyor ama 1 saat hareketsiz kalan kullanıcı otomatik olarak çıkış yapıyor.

Session Güvenliği: Redis Prefix ve Namespace Yönetimi

Birden fazla uygulama aynı Redis instance’ını kullanıyorsa namespace karışıklığı ciddi sorun çıkarır. Bunu önlemek için katmanlı bir prefix yapısı kullanın:

# Redis CLI ile namespace kontrolü
redis-cli

# Tüm session key'lerini listele (production'da dikkatli kullanın!)
SCAN 0 MATCH "sess:*" COUNT 100

# Belirli bir session'ın TTL'ini kontrol et
TTL sess:a1b2c3d4-e5f6-7890-abcd-ef1234567890

# Session içeriğini görüntüle
GET sess:a1b2c3d4-e5f6-7890-abcd-ef1234567890

# Aktif session sayısını öğren
redis-cli --scan --pattern "sess:*" | wc -l

Production ortamında KEYS * komutunu kesinlikle kullanmayın. Bu komut Redis’i blokladığı için anlık bir donmaya yol açar. SCAN komutu cursor tabanlı çalıştığı için çok daha güvenlidir.

Cache Stratejileri: Teoriden Pratiğe

Cache konusunda herkesin bildiği ama çoğunun yanlış uyguladığı birkaç pattern var. Bunları gerçek senaryolarla ele alalım.

Cache-Aside (Lazy Loading)

En yaygın pattern budur. Uygulama önce cache’e bakar, yoksa veritabanından çeker ve cache’e yazar:

# cache_strategies.py
import redis
import json
import hashlib
from functools import wraps

redis_client = redis.Redis(host='localhost', port=6379, 
                           decode_responses=True)

def cache_aside(ttl=300, prefix="cache"):
    """
    Cache-aside decorator
    Kullanım: @cache_aside(ttl=600, prefix="product")
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Cache key oluştur
            cache_key = f"{prefix}:{func.__name__}:"
            key_data = json.dumps({"args": args, "kwargs": kwargs}, 
                                   sort_keys=True)
            cache_key += hashlib.md5(key_data.encode()).hexdigest()
            
            # Cache'e bak
            cached = redis_client.get(cache_key)
            if cached:
                return json.loads(cached)
            
            # Cache miss - veritabanından çek
            result = func(*args, **kwargs)
            
            if result is not None:
                redis_client.setex(
                    cache_key, 
                    ttl, 
                    json.dumps(result)
                )
            
            return result
        return wrapper
    return decorator

# Kullanım örneği
@cache_aside(ttl=600, prefix="product")
def get_product(product_id: int):
    # Normalde burada DB sorgusu olur
    # Örnek amaçlı statik veri döndürüyoruz
    return {"id": product_id, "name": "Ürün", "price": 99.90}

Write-Through Cache

Veri yazıldığında hem cache hem de veritabanı güncellenir. Okuma performansı kritikse bu pattern tercih edilir:

# write_through.py
class WriteThroughCache:
    def __init__(self, redis_client, db_connection):
        self.redis = redis_client
        self.db = db_connection
        self.ttl = 3600
    
    def write(self, key: str, data: dict) -> bool:
        try:
            # Önce veritabanına yaz
            self.db.execute(
                "UPDATE products SET data = %s WHERE id = %s",
                (json.dumps(data), data['id'])
            )
            self.db.commit()
            
            # Başarılıysa cache'i güncelle
            cache_key = f"product:{key}"
            self.redis.setex(cache_key, self.ttl, json.dumps(data))
            
            return True
        except Exception as e:
            # DB yazma başarısız olduysa cache'i de temizle
            self.redis.delete(f"product:{key}")
            raise e
    
    def read(self, key: str) -> dict:
        cache_key = f"product:{key}"
        cached = self.redis.get(cache_key)
        
        if cached:
            return json.loads(cached)
        
        # Cache miss - DB'den çek ve cache'e yaz
        result = self.db.execute(
            "SELECT * FROM products WHERE id = %s", (key,)
        ).fetchone()
        
        if result:
            self.redis.setex(cache_key, self.ttl, json.dumps(dict(result)))
        
        return dict(result) if result else None

Cache Invalidation Stratejileri

Cache’in en zor kısmı invalidation’dır. “There are only two hard things in Computer Science: cache invalidation and naming things” sözü boşuna söylenmemiş. Benim tercih ettiğim yaklaşım tag tabanlı invalidation:

# tag_based_invalidation.py
class TaggedCache:
    def __init__(self, redis_client):
        self.redis = redis_client
        self.tag_prefix = "tag:"
    
    def set_with_tags(self, key: str, value, ttl: int, 
                       tags: list) -> None:
        pipe = self.redis.pipeline()
        
        # Ana veriyi yaz
        pipe.setex(key, ttl, json.dumps(value))
        
        # Her tag için key'i kaydet
        for tag in tags:
            tag_key = f"{self.tag_prefix}{tag}"
            pipe.sadd(tag_key, key)
            pipe.expire(tag_key, ttl + 60)  # Tag biraz daha uzun yaşasın
        
        pipe.execute()
    
    def invalidate_by_tag(self, tag: str) -> int:
        tag_key = f"{self.tag_prefix}{tag}"
        keys = self.redis.smembers(tag_key)
        
        if not keys:
            return 0
        
        pipe = self.redis.pipeline()
        for key in keys:
            pipe.delete(key)
        pipe.delete(tag_key)
        pipe.execute()
        
        return len(keys)

# Kullanım
tagged_cache = TaggedCache(redis_client)

# Ürün verisi kaydet, kategori ve marka tag'leriyle ilişkilendir
tagged_cache.set_with_tags(
    key="product:1234",
    value={"id": 1234, "name": "Laptop", "category": "elektronik"},
    ttl=3600,
    tags=["category:elektronik", "brand:xyz", "product:1234"]
)

# Tüm elektronik ürünlerin cache'ini temizle
invalidated_count = tagged_cache.invalidate_by_tag("category:elektronik")
print(f"{invalidated_count} cache kaydı temizlendi")

Redis Pub/Sub ile Mesaj Kuyruğu

Redis’in Pub/Sub mekanizması, cache invalidation’ı mikroservis mimarisinde çok daha zarif çözüyor. Aynı zamanda gerçek zamanlı bildirimler, event broadcasting ve servisler arası iletişim için kullanışlı.

Pub/Sub’ın List tabanlı queue’dan farkı şu: Pub/Sub’da mesaj “şu anda dinleyen”lere iletilir, geçmişe dönük mesajlar saklanmaz. Eğer bir subscriber o an bağlı değilse mesajı kaçırır. Bu bir eksiklik değil, tasarım kararı. Kalıcı mesaj kuyruğu için Redis Stream’lere bakmalısınız ama bugünkü konumuz Pub/Sub.

# pubsub_cache_invalidation.py
import redis
import json
import threading
import time

class CacheInvalidationPublisher:
    def __init__(self, redis_client):
        self.redis = redis_client
        self.channel = "cache:invalidation"
    
    def publish_invalidation(self, cache_keys: list, 
                              reason: str = "update") -> None:
        message = {
            "keys": cache_keys,
            "reason": reason,
            "timestamp": time.time()
        }
        self.redis.publish(self.channel, json.dumps(message))
        print(f"Invalidation mesajı yayınlandı: {cache_keys}")

class CacheInvalidationSubscriber:
    def __init__(self, redis_host='localhost', redis_port=6379):
        # Subscriber için ayrı bir bağlantı kullanın!
        self.redis = redis.Redis(host=redis_host, port=redis_port,
                                  decode_responses=True)
        self.pubsub = self.redis.pubsub()
        self.local_cache = {}
        self.running = False
    
    def handle_invalidation(self, message):
        if message['type'] != 'message':
            return
        
        data = json.loads(message['data'])
        keys_to_invalidate = data.get('keys', [])
        
        for key in keys_to_invalidate:
            if key in self.local_cache:
                del self.local_cache[key]
                print(f"Local cache temizlendi: {key}")
    
    def start_listening(self):
        self.pubsub.subscribe(**{
            "cache:invalidation": self.handle_invalidation
        })
        self.running = True
        
        # Ayrı thread'de çalıştır
        thread = threading.Thread(target=self._listen_loop, daemon=True)
        thread.start()
        return thread
    
    def _listen_loop(self):
        while self.running:
            self.pubsub.get_message(ignore_subscribe_messages=True, 
                                     timeout=1.0)
            time.sleep(0.01)
    
    def stop(self):
        self.running = False
        self.pubsub.unsubscribe()
        self.pubsub.close()

Pub/Sub’ı production’da kullanırken dikkat etmeniz gereken en kritik nokta: subscriber bağlantılarını publisher bağlantılarından ayırın. Aynı connection objesi üzerinden hem pub/sub hem de normal Redis operasyonları yapmaya çalışmak sorun çıkarır.

Redis List ile Basit İş Kuyruğu

Pub/Sub’ın yanı sıra, özellikle e-posta gönderimi veya rapor üretimi gibi asenkron işler için Redis List’i basit bir job queue olarak kullanabilirsiniz:

# job_queue.py
import redis
import json
import time

redis_client = redis.Redis(host='localhost', port=6379, 
                           decode_responses=True)

class SimpleJobQueue:
    def __init__(self, queue_name: str):
        self.queue_name = queue_name
        self.processing_key = f"{queue_name}:processing"
    
    def enqueue(self, job_data: dict) -> str:
        job_id = f"job:{int(time.time() * 1000)}"
        job_data['job_id'] = job_id
        job_data['enqueued_at'] = time.time()
        
        # LPUSH ile kuyruğun başına ekle
        redis_client.lpush(self.queue_name, json.dumps(job_data))
        
        # Job meta verisi - 24 saat sakla
        redis_client.setex(f"job:meta:{job_id}", 86400, 
                           json.dumps({"status": "queued", 
                                       "data": job_data}))
        return job_id
    
    def dequeue(self, timeout: int = 30) -> dict:
        # BRPOP - blocking pop, kuyrukta iş yoksa timeout kadar bekle
        result = redis_client.brpop(self.queue_name, timeout=timeout)
        
        if result is None:
            return None
        
        _, job_json = result
        job_data = json.loads(job_json)
        
        # İşleme alındı olarak işaretle
        redis_client.setex(
            f"job:meta:{job_data['job_id']}", 
            86400,
            json.dumps({"status": "processing", "data": job_data})
        )
        
        return job_data
    
    def complete_job(self, job_id: str, result=None) -> None:
        redis_client.setex(
            f"job:meta:{job_id}",
            3600,  # Tamamlanan işleri 1 saat sakla
            json.dumps({"status": "completed", "result": result})
        )
    
    def queue_length(self) -> int:
        return redis_client.llen(self.queue_name)

# Kullanım
email_queue = SimpleJobQueue("email:queue")

# Job ekle
job_id = email_queue.enqueue({
    "to": "[email protected]",
    "subject": "Siparişiniz alındı",
    "template": "order_confirmation",
    "order_id": 12345
})

print(f"Job kuyruğa eklendi: {job_id}")
print(f"Kuyrukta bekleyen iş sayısı: {email_queue.queue_length()}")

Production’da Redis: Monitoring ve Sorun Giderme

Tüm bu stratejileri kurarken izlemeniz gereken metrikler var. Redis’in INFO komutu inanılmaz miktarda bilgi döndürür:

# Genel Redis durumu
redis-cli INFO all

# Sadece memory bilgisi
redis-cli INFO memory

# Keyspace hit/miss oranı - cache etkinliğini ölçer
redis-cli INFO stats | grep -E "keyspace_hits|keyspace_misses"

# Bağlantı sayısı
redis-cli INFO clients | grep connected_clients

# Slowlog - yavaş komutları izle (10ms üzeri)
redis-cli CONFIG SET slowlog-log-slower-than 10000
redis-cli SLOWLOG GET 25

# Memory kullanımı ve fragmentation
redis-cli INFO memory | grep -E "used_memory_human|mem_fragmentation_ratio"

Cache hit rate hesaplaması için şu formülü kullanıyorum:

# Cache hit rate hesapla
redis-cli INFO stats | grep keyspace | awk -F: '
{
  for(i=1;i<=NF;i++) {
    if($i ~ /keyspace_hits/) hits=$(i+1)
    if($i ~ /keyspace_misses/) misses=$(i+1)
  }
}
END {
  total = hits + misses
  if(total > 0) printf "Hit Rate: %.2f%%n", (hits/total)*100
  else print "Henüz veri yok"
}'

Genel kural olarak cache hit rate’in %80’in altına düşmesi ciddi bir sorun işareti. Ya TTL’leriniz çok kısa, ya cache boyutunuz yetersiz, ya da cache key’leriniz tutarsız.

Sonuç

Redis’i session ve cache yönetimi için doğru kullanmak, uygulamanızın ölçeklenebilirliğini dramatik biçimde artırır. Ama “Redis’e at, hallolur” mantığıyla yaklaşmak da beraberinde sorunlar getirir. Yazı boyunca anlattıklarımı özetlersek:

  • Session yönetiminde sliding expiration kullanın, prefix’leri tutarlı tutun ve production’da KEYS * yerine SCAN kullanın.
  • Cache stratejisi seçiminde okuma/yazma oranınıza bakın. Okuma yoğunsa write-through, yazma yoğunsa cache-aside daha uygun.
  • Cache invalidation’da tag tabanlı yaklaşım uzun vadede sizi kurtarır. Tek tek key silmek ölçeklenmez.
  • Pub/Sub kullanımında subscriber bağlantılarını ayrı tutun ve mesaj kaybını tolere edemiyorsanız Redis Stream’lere geçin.
  • Monitoring’i ihmal etmeyin. Hit rate, memory fragmentation ve slow log düzenli takip edilmesi gereken metrikler.

Redis’i gerçekten kavramak, sadece SET ve GET bilmekten fazlasını gerektiriyor. Data structure’larını, eviction policy’lerini ve persistence seçeneklerini iş gereksinimlerinize göre doğru konfigure ettiğinizde Redis, mimarinizin en güvenilir bileşenlerinden biri haline gelir. O ilk kampanya gününden beri Redis konfigürasyonuma dokunmadan geçen her kampanya günü, bunun ne kadar doğru bir karar olduğunu kanıtlıyor.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir