Redis Pipeline ve Batch İşlemleri ile Ağ Gecikmesini Azaltma

Redis kullanıyorsanız ve uygulama performansından şikayetçiyseniz, büyük ihtimalle sorun ağ gecikmesinden kaynaklanıyordur. Her Redis komutu için ayrı ayrı network round-trip yapıyorsanız, bu durum özellikle yüksek trafikli sistemlerde ciddi bir darboğaza dönüşür. Pipeline ve batch işlemleri tam da bu problemi çözmek için var ve doğru kullanıldığında uygulama performansını dramatik biçimde iyileştirebilir.

Redis’te Ağ Gecikmesi Neden Önemli?

Redis kendi başına son derece hızlı bir araç. Bellek tabanlı olduğu için okuma/yazma işlemleri mikrosaniye mertebesinde gerçekleşiyor. Ama iş ağa gelince durum farklılaşıyor.

Tipik bir Redis istek-yanıt döngüsünü düşünelim. Uygulama sunucunuz Redis’e bir komut gönderir, Redis komutu işler ve yanıt döner. Bu tek bir round-trip. Eğer uygulama sunucunuz ile Redis aynı veri merkezindeyse bu gecikme 0.5-1ms civarında olabilir. Farklı veri merkezlerindeyse 5-10ms’ye çıkabilir. Kulağa çok az gelir, değil mi?

Ama şunu düşünün: Bir kullanıcı isteği işlenirken 50 farklı Redis komutu çalışıyorsa, bu 50 x 1ms = 50ms demek. Sadece Redis iletişimi için. Bu rakam gerçek dünya senaryolarında çok daha fazla olabilir. İşte pipeline burada devreye giriyor.

RTT (Round Trip Time) kavramını iyi anlamak gerekiyor. Redis pipeline, birden fazla komutu tek bir network paketi içinde gönderip tüm yanıtları toplu olarak almayı sağlar. Bu sayede 50 komut için 50 RTT yerine sadece 1 RTT ödüyorsunuz.

Pipeline Olmadan vs Pipeline ile: Pratik Karşılaştırma

Önce pipeline kullanmadan yapılan işlemlere bakalım. Aşağıdaki Python örneği, 1000 anahtar-değer çiftini sırayla Redis’e yazıyor:

import redis
import time

r = redis.Redis(host='localhost', port=6379, db=0)

# Pipeline olmadan - her komut ayrı network çağrısı
start_time = time.time()
for i in range(1000):
    r.set(f"user:session:{i}", f"data_{i}", ex=3600)

elapsed = time.time() - start_time
print(f"Pipeline olmadan: {elapsed:.3f} saniye")
# Tipik çıktı: Pipeline olmadan: 2.847 saniye

Şimdi aynı işlemi pipeline ile yapalım:

import redis
import time

r = redis.Redis(host='localhost', port=6379, db=0)

# Pipeline ile - tüm komutlar tek seferde gönderiliyor
start_time = time.time()
pipe = r.pipeline()
for i in range(1000):
    pipe.set(f"user:session:{i}", f"data_{i}", ex=3600)
pipe.execute()

elapsed = time.time() - start_time
print(f"Pipeline ile: {elapsed:.3f} saniye")
# Tipik çıktı: Pipeline ile: 0.089 saniye

Bu fark gerçekten çarpıcı. 1000 işlem için yaklaşık 32 kat performans artışı. Gecikme değerleri network koşullarına göre değişir ama pipeline her durumda dramatik iyileştirme sağlar.

Redis Pipeline’ın Çalışma Mekanizması

Pipeline’ı anlamak için Redis’in istek-yanıt modelini kavramak gerekiyor. Normal modda her komut şu döngüyü takip eder:

  • İstemci komutu gönderir
  • Ağ üzerinden Redis’e ulaşır
  • Redis işler ve yanıt hazırlar
  • Yanıt ağ üzerinden istemciye döner
  • İstemci bir sonraki komutu gönderir

Pipeline modunda ise:

  • İstemci tüm komutları buffer’a yazar
  • Hepsi bir seferde network’e gönderilir
  • Redis komutları sırayla işler
  • Tüm yanıtlar tek seferde istemciye döner

Önemli bir nokta: Pipeline atomik değildir. Komutlar sırayla işlenir ama aralarına başka istemcilerin komutları girebilir. Atomiklik istiyorsanız MULTI/EXEC ile transaction kullanmanız gerekir.

Gerçek Dünya Senaryosu: E-ticaret Sepet Yönetimi

Bir e-ticaret uygulamasında kullanıcı sepetini yönetmek için pipeline kullanalım. Bu senaryo çok yaygın ve pipeline’dan en çok fayda gören kullanım durumlarından biri:

import redis
import json
from typing import List, Dict

class CartManager:
    def __init__(self):
        self.r = redis.Redis(host='localhost', port=6379, db=0, 
                            decode_responses=True)
    
    def add_items_to_cart(self, user_id: str, items: List[Dict]):
        """
        Birden fazla ürünü sepete pipeline ile ekler.
        Her ürün için: sepete ekle, stok güncelle, log yaz
        """
        cart_key = f"cart:{user_id}"
        pipe = self.r.pipeline()
        
        for item in items:
            product_id = item['product_id']
            quantity = item['quantity']
            price = item['price']
            
            # Sepete ürün ekle (hash yapısı)
            pipe.hset(cart_key, product_id, 
                     json.dumps({'qty': quantity, 'price': price}))
            
            # Sepet toplam değerini güncelle
            pipe.hincrbyfloat(f"cart_meta:{user_id}", 
                            'total', quantity * price)
            
            # Stok rezervasyonu için set'e ekle
            pipe.sadd(f"reserved_stock:{product_id}", user_id)
            
            # Sepet TTL'ini yenile (24 saat)
            pipe.expire(cart_key, 86400)
        
        results = pipe.execute()
        return results
    
    def get_cart_details(self, user_ids: List[str]) -> Dict:
        """
        Birden fazla kullanıcının sepetini aynı anda getir
        """
        pipe = self.r.pipeline()
        
        for user_id in user_ids:
            pipe.hgetall(f"cart:{user_id}")
            pipe.hgetall(f"cart_meta:{user_id}")
        
        results = pipe.execute()
        
        # Sonuçları kullanıcı bazında organize et
        carts = {}
        for i, user_id in enumerate(user_ids):
            carts[user_id] = {
                'items': results[i * 2],
                'meta': results[i * 2 + 1]
            }
        
        return carts

# Kullanım örneği
manager = CartManager()
items = [
    {'product_id': 'SKU001', 'quantity': 2, 'price': 149.99},
    {'product_id': 'SKU002', 'quantity': 1, 'price': 299.99},
    {'product_id': 'SKU003', 'quantity': 3, 'price': 49.99},
]
manager.add_items_to_cart('user_12345', items)

Batch İşlemleri ile Toplu Veri Güncellemeleri

Pipeline ve batch kavramları bazen birbirine karıştırılıyor. Pipeline teknik bir mekanizma iken, batch işlemleri daha çok veriyi gruplar halinde işleme stratejisidir. İkisini birlikte kullanmak en güçlü sonucu verir.

Gerçek dünya örneği olarak bir analitik sistemi düşünelim. Kullanıcı davranış verilerini toplu olarak Redis’e yazıyoruz:

# Redis CLI ile pipeline kullanımı - pipe modu
# Büyük veri setlerini import ederken kullanışlı

cat user_events.txt | redis-cli --pipe

# user_events.txt formatı (Redis protocol formatı):
# *3
# $3
# SET
# $20
# user:event:1234
# $15
# click_homepage

# Alternatif: --pipe-mode ile benchmark
redis-cli --pipe-mode --pipe-timeout 5 < bulk_commands.txt

# Pipeline performansını test etmek için
redis-benchmark -n 100000 -c 50 -P 16 -q
# -P 16: 16 komutluk pipeline grupları kullan
# -c 50: 50 paralel bağlantı
# -n 100000: toplam 100000 istek

Şimdi Node.js ile gerçekçi bir batch işlem örneği görelim. Log aggregation sistemi:

const redis = require('ioredis');

const client = new redis({
  host: 'localhost',
  port: 6379,
  maxRetriesPerRequest: 3
});

class LogAggregator {
  constructor() {
    this.batchSize = 100; // Her pipeline'da 100 komut
    this.buffer = [];
  }

  async flushBuffer() {
    if (this.buffer.length === 0) return;

    const pipeline = client.pipeline();
    const timestamp = Math.floor(Date.now() / 1000);
    const dateKey = new Date().toISOString().split('T')[0];

    for (const log of this.buffer) {
      const { userId, action, metadata } = log;
      
      // Günlük aksiyon sayacı
      pipeline.hincrby(`stats:daily:${dateKey}`, action, 1);
      
      // Kullanıcı aktivite skoru (son 1 saat)
      pipeline.zadd(`user:activity:${userId}`, timestamp, 
                    JSON.stringify({ action, metadata }));
      
      // Zaman damgalı log kaydı
      pipeline.lpush(`logs:${action}`, 
                     JSON.stringify({ userId, timestamp, metadata }));
      
      // Log listesini 1000 ile sınırla
      pipeline.ltrim(`logs:${action}`, 0, 999);
      
      // Aktif kullanıcı seti (son 5 dakika)
      pipeline.zadd('active_users', timestamp, userId);
    }

    // Eski aktif kullanıcıları temizle (5 dakikadan eski)
    const fiveMinAgo = timestamp - 300;
    pipeline.zremrangebyscore('active_users', 0, fiveMinAgo);

    try {
      const results = await pipeline.exec();
      console.log(`${this.buffer.length} log kaydı işlendi`);
      this.buffer = [];
      return results;
    } catch (error) {
      console.error('Pipeline hatası:', error);
      throw error;
    }
  }

  addLog(userId, action, metadata = {}) {
    this.buffer.push({ userId, action, metadata });
    
    if (this.buffer.length >= this.batchSize) {
      this.flushBuffer();
    }
  }
}

// Periyodik flush için interval
const aggregator = new LogAggregator();
setInterval(() => aggregator.flushBuffer(), 1000); // Her saniye flush

module.exports = aggregator;

Pipeline Chunk Stratejisi: Büyük Veri Setlerini Yönetmek

Pipeline kullanırken dikkat edilmesi gereken önemli bir nokta var: Çok büyük pipeline’lar Redis server’ında bellek baskısı yaratabilir. 10.000 komutluk tek bir pipeline yerine, 500-1000 komutluk chunk’lara bölmek daha sağlıklı bir yaklaşım:

import redis
from typing import List, Any
import math

def chunked_pipeline(r: redis.Redis, operations: List[tuple], 
                     chunk_size: int = 500) -> List[Any]:
    """
    Büyük işlem listelerini chunk'lara bölerek pipeline ile çalıştırır.
    
    operations: [(method_name, args, kwargs), ...] formatında liste
    chunk_size: Her pipeline'daki maksimum komut sayısı
    """
    all_results = []
    total_chunks = math.ceil(len(operations) / chunk_size)
    
    for chunk_num in range(total_chunks):
        start_idx = chunk_num * chunk_size
        end_idx = min(start_idx + chunk_size, len(operations))
        chunk = operations[start_idx:end_idx]
        
        pipe = r.pipeline()
        
        for method_name, args, kwargs in chunk:
            getattr(pipe, method_name)(*args, **kwargs)
        
        try:
            chunk_results = pipe.execute()
            all_results.extend(chunk_results)
            print(f"Chunk {chunk_num + 1}/{total_chunks} tamamlandı "
                  f"({len(chunk)} işlem)")
        except Exception as e:
            print(f"Chunk {chunk_num + 1} hatası: {e}")
            raise
    
    return all_results


# Kullanım örneği: 10000 kullanıcı profili güncelleme
r = redis.Redis(host='localhost', port=6379, decode_responses=True)

operations = []
for i in range(10000):
    operations.append(('hset', [f'user:{i}', 'last_login', '2024-01-15'], {}))
    operations.append(('expire', [f'user:{i}', 7200], {}))

results = chunked_pipeline(r, operations, chunk_size=500)
print(f"Toplam {len(results)} işlem tamamlandı")

MULTI/EXEC ile Atomik İşlemler

Bazen pipeline yeterli olmaz ve atomiklik şarttır. Banka transferi veya envanter güncellemesi gibi durumlarda MULTI/EXEC kullanın:

import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def transfer_credits(from_user: str, to_user: str, amount: int) -> bool:
    """
    Kullanıcılar arası kredi transferi - atomik olmalı.
    WATCH ile optimistic locking kullanıyoruz.
    """
    from_key = f"credits:{from_user}"
    to_key = f"credits:{to_user}"
    
    with r.pipeline() as pipe:
        max_attempts = 3
        
        for attempt in range(max_attempts):
            try:
                # İzlemeye başla - bu key'ler değişirse transaction iptal
                pipe.watch(from_key, to_key)
                
                # Mevcut bakiyeyi kontrol et
                from_balance = int(pipe.get(from_key) or 0)
                
                if from_balance < amount:
                    pipe.unwatch()
                    print(f"Yetersiz bakiye: {from_balance}")
                    return False
                
                # Transaction başlat
                pipe.multi()
                pipe.decrby(from_key, amount)
                pipe.incrby(to_key, amount)
                pipe.lpush(f"transfer_log:{from_user}", 
                          f"sent:{amount}:{to_user}")
                pipe.lpush(f"transfer_log:{to_user}", 
                          f"received:{amount}:{from_user}")
                pipe.execute()
                
                print(f"Transfer başarılı: {amount} kredi")
                return True
                
            except redis.WatchError:
                # Başka bir işlem key'i değiştirdi, tekrar dene
                print(f"Watch error, deneme {attempt + 1}/{max_attempts}")
                continue
    
    return False

# Test
r.set("credits:alice", 1000)
r.set("credits:bob", 500)
transfer_credits("alice", "bob", 250)
print(f"Alice: {r.get('credits:alice')}")  # 750
print(f"Bob: {r.get('credits:bob')}")      # 750

Redis Cluster Ortamında Pipeline

Redis Cluster kullanıyorsanız pipeline konusu biraz daha karmaşık hale gelir. Cluster modunda her key farklı bir node’da olabilir ve aynı pipeline içindeki komutlar farklı slot’lara ait olabilir:

# Redis Cluster kurulum kontrolü
redis-cli cluster info | grep cluster_state

# Cluster'da hangi key hangi slot'ta?
redis-cli cluster keyslot "user:12345"

# Hash tag kullanarak aynı slot'a zorlamak
# {user_12345} hash tag'i, tüm key'lerin aynı slot'a gitmesini sağlar
redis-cli cluster keyslot "{user_12345}:cart"
redis-cli cluster keyslot "{user_12345}:profile"
redis-cli cluster keyslot "{user_12345}:preferences"
# Hepsi aynı slot'ta olur, pipeline güvenle çalışır
import redis

# Cluster pipeline kullanımı
from rediscluster import RedisCluster

startup_nodes = [
    {"host": "redis-node1", "port": "7000"},
    {"host": "redis-node2", "port": "7001"},
    {"host": "redis-node3", "port": "7002"}
]

rc = RedisCluster(startup_nodes=startup_nodes, decode_responses=True)

# Cluster pipeline - aynı slot'taki key'ler için
user_id = "12345"
pipe = rc.pipeline()

# Hash tag kullanarak tüm key'leri aynı slot'a yönlendiriyoruz
pipe.set(f"{{{user_id}}}:session", "active", ex=3600)
pipe.hset(f"{{{user_id}}}:profile", "name", "Mehmet")
pipe.lpush(f"{{{user_id}}}:activity", "login")
pipe.expire(f"{{{user_id}}}:activity", 86400)

results = pipe.execute()
print("Cluster pipeline sonuçları:", results)

Monitoring: Pipeline Performansını İzlemek

Pipeline kullanımının etkisini ölçmek için bazı pratik yöntemler:

# Redis slowlog - yavaş komutları yakala
# Eşiği 10000 mikrosaniye (10ms) olarak ayarla
redis-cli CONFIG SET slowlog-log-slower-than 10000
redis-cli CONFIG SET slowlog-max-len 128

# Slowlog'u görüntüle
redis-cli SLOWLOG GET 10

# Redis INFO ile throughput izleme
redis-cli INFO stats | grep -E "total_commands_processed|instantaneous_ops_per_sec"

# Pipeline etkinliğini anlamak için
# Her saniyedeki işlem sayısını izle
watch -n 1 'redis-cli INFO stats | grep instantaneous_ops_per_sec'

# Memory kullanımını izle (pipeline'ın bellek üzerindeki etkisi)
redis-cli INFO memory | grep -E "used_memory_human|mem_fragmentation_ratio"

# Client listesi - bağlantı durumunu izle
redis-cli CLIENT LIST | grep -c "cmd=pipeline"

Yaygın Hatalar ve Çözümleri

Pipeline kullanırken sık karşılaşılan sorunlar ve bunlardan nasıl kaçınabileceğiniz:

Hata 1: Pipeline sonuçlarını yanlış okumak

Pipeline’da her komut için sonuç execute() listesinde sırayla döner. İndeks takibini kaybetmek kolaydır. Komutları gruplara ayırın ve sonuçları ona göre parse edin.

Hata 2: Çok büyük pipeline açmak

Tek pipeline’da 10.000+ komut göndermek Redis’in output buffer’ını zorlayabilir. client-output-buffer-limit ayarını kontrol edin ve chunk stratejisi kullanın.

Hata 3: Pipeline içinde SUBSCRIBE kullanmak

Pub/Sub komutları pipeline ile uyumlu değil. Bu komutlar için ayrı bağlantı kullanın.

Hata 4: Hata yönetimini atlamak

Pipeline’daki bir komut hata verirse, diğer komutlar çalışmaya devam eder. pipe.execute() çağrısında oluşan hataları mutlaka yakalayın ve her komutun sonucunu kontrol edin.

  • pipeline.execute(raise_on_error=False): Python’da hataları exception yerine liste içinde döndürür
  • Sonuç listesini iterate ederken isinstance kontrolü yapın
  • Atomiklik gereken durumlarda MULTI/EXEC kullanın, plain pipeline değil

Sonuç

Redis pipeline ve batch işlemleri, doğru senaryolarda kullanıldığında ağ gecikmesini dramatik biçimde azaltan güçlü araçlardır. Temel mesaj şu: Her Redis çağrısı için ayrı network round-trip ödemeyin.

Benim önerilerim pratikte şöyle şekilleniyor. Birden fazla bağımsız komut çalıştırıyorsanız, pipeline ilk tercihiniz olsun. Büyük veri setlerinde chunk stratejisini unutmayın, 500-1000 komutluk gruplar genellikle optimal noktadır. Atomiklik istiyorsanız MULTI/EXEC kullanın. Redis Cluster ortamında hash tag’lerle aynı slot yönetimini ihmal etmeyin.

Performans etkisini ölçmek için her zaman benchmarking yapın. redis-benchmark -P parametresi ile farklı pipeline boyutlarını test edebilirsiniz. Gerçek uygulamanızda da before/after ölçümü alın, çünkü network gecikmesi ortama göre çok değişiyor ve kazanım da ona göre farklılaşıyor.

Son olarak şunu söyleyeyim: Pipeline bir sihirli değnek değil. Okuma-yazma bağımlılığı olan işlemlerde kullanamıyorsunuz çünkü önceki komutun sonucuna göre sonraki komutu belirlemeniz gerekiyor. Ama bu kısıtlamayı bilip, uygun yerlerde pipeline’ı devreye soktuğunuzda Redis’in gerçek potansiyelini ortaya çıkarıyorsunuz.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir