Gerçek zamanlı veri işleme ihtiyacı her geçen gün artıyor. Kullanıcı davranış takibi, IoT sensör verileri, canlı bildirim sistemleri, log aggregation… Bunların hepsinde ortak bir sorun var: verileri hızlı üretip hızlı tüketmen gerekiyor. Redis Streams tam da bu noktada devreye giriyor ve bunu inanılmaz basit bir şekilde çözüyor.
Redis 5.0 ile hayatımıza giren Streams yapısı, Apache Kafka’ya benzer bir append-only log sistemi sunuyor. Ama Kafka’nın aksine Redis’in sadeliğini ve hızını koruyor. Eğer ekibinde Kafka kurup yönetecek bant genişliği yoksa ama yine de güçlü bir mesaj kuyruğu istiyorsan, Redis Streams tam senin için.
Redis Streams Nedir ve Neden Önemli?
Klasik Redis veri yapılarını düşündüğünde Lists, Sets, Sorted Sets aklına gelir. Bunlar güçlü ama gerçek zamanlı mesajlaşma için bazı eksiklikleri var. Örneğin bir List üzerinde birden fazla consumer’ın bağımsız olarak veri okuması karmaşıklaşıyor. Pub/Sub ise mesajı kaçırırsan gitti, geçmişe dönüp bakamıyorsun.
Streams bu sorunları şöyle çözüyor:
- Kalıcı depolama: Mesajlar silinene kadar stream’de kalır
- Consumer Groups: Birden fazla tüketici grubu aynı stream’i farklı offset’lerden okuyabilir
- Acknowledgment mekanizması: İşlenen mesajları onaylayabilirsin, onaylanmayanları tekrar işleyebilirsin
- Otomatik ID üretimi: Her mesaj benzersiz bir timestamp tabanlı ID alır
- Backpressure yönetimi: MAXLEN ile stream boyutunu kontrol edebilirsin
Temel Veri Yapısı ve Komutlar
Stream’de her kayıt bir ID ve key-value çiftlerinden oluşuyor. ID formatı - şeklinde. Bu sayede mesajların zamansal sırasını her zaman biliyorsun.
Veri Ekleme (XADD)
# Basit mesaj ekleme
redis-cli XADD mystream * sensor_id 123 temperature 45.2 humidity 67
# Çıktı: "1703001234567-0" (otomatik ID)
# Manuel ID belirleme (nadir durumlarda)
redis-cli XADD mystream 1703001234567-0 sensor_id 123 temperature 45.2
# MAXLEN ile stream boyutu sınırlama (yaklaşık 1000 kayıt tut)
redis-cli XADD mystream MAXLEN ~ 1000 * event_type "click" user_id 456 page "/home"
# Minimum ID ile eski kayıtları temizleme
redis-cli XADD mystream MINID ~ 1703001000000 * sensor_id 789 value 99
~ işareti burada önemli. Tam olarak 1000 demek değil, “yaklaşık 1000” demek. Bu Redis’e biraz esneklik tanıyor ve performansı artırıyor. Eğer kesin olarak 1000 istiyorsan = kullanırsın ama genellikle ~ daha mantıklı.
Veri Okuma (XREAD)
# En baştan tüm kayıtları oku
redis-cli XREAD COUNT 10 STREAMS mystream 0
# Sadece yeni gelen kayıtları izle (blocking mod)
redis-cli XREAD COUNT 10 BLOCK 0 STREAMS mystream $
# Belirli bir ID'den sonrasını oku
redis-cli XREAD COUNT 5 STREAMS mystream 1703001234567-0
# Birden fazla stream'i aynı anda izle
redis-cli XREAD COUNT 10 BLOCK 5000 STREAMS stream1 stream2 $ $
BLOCK 0 komutu burada çok önemli. Bu sayede yeni mesaj gelene kadar bağlantı açık kalıyor. 0 sonsuz bekleme anlamına geliyor, 5000 ise 5 saniye bekle sonra boş dön demek.
Stream Bilgilerini Görüntüleme
# Stream hakkında genel bilgi
redis-cli XINFO STREAM mystream
# Detaylı bilgi (full flag ile)
redis-cli XINFO STREAM mystream FULL COUNT 10
# Consumer group bilgileri
redis-cli XINFO GROUPS mystream
# Belirli bir gruptaki consumer'lar
redis-cli XINFO CONSUMERS mystream mygroup
Consumer Groups ile Paralel İşleme
Bu özellik Redis Streams’i gerçekten güçlü kılıyor. Consumer Group sayesinde birden fazla worker aynı stream’den farklı mesajlar alıp paralel işleyebiliyor. Bir e-ticaret platformunda sipariş işleme sistemini düşün: 10 worker aynı sipariş stream’ini dinliyor, her worker farklı bir siparişi alıp işliyor.
# Consumer group oluşturma
# 0: En başından itibaren oku
# $: Sadece yeni mesajları oku
redis-cli XGROUP CREATE orders_stream processors $ MKSTREAM
# MKSTREAM: Stream yoksa otomatik oluştur
redis-cli XGROUP CREATE sensor_data analytics 0 MKSTREAM
# Group bilgilerini kontrol et
redis-cli XGROUP SETID orders_stream processors $
# Mevcut bir group'u sil
redis-cli XGROUP DESTROY orders_stream old_processors
Consumer’larla Mesaj Okuma (XREADGROUP)
# Worker-1 mesaj alıyor
redis-cli XREADGROUP GROUP processors worker-1 COUNT 5 STREAMS orders_stream >
# > işareti: Henüz başka consumer'a verilmemiş yeni mesajları getir
# Bu mesajlar artık "pending" durumda, worker-1'in sorumluluğunda
# Belirli bir worker'ın pending mesajlarını tekrar okuma
redis-cli XREADGROUP GROUP processors worker-1 COUNT 5 STREAMS orders_stream 0
# Blocking mod ile sürekli dinleme
redis-cli XREADGROUP GROUP processors worker-2 COUNT 1 BLOCK 2000 STREAMS orders_stream >
Mesaj Onaylama (XACK)
# Başarıyla işlenen mesajı onayla
redis-cli XACK orders_stream processors 1703001234567-0
# Birden fazla mesajı aynı anda onayla
redis-cli XACK orders_stream processors 1703001234567-0 1703001234568-0 1703001234569-0
# Pending mesajları listele (onaylanmamış mesajlar)
redis-cli XPENDING orders_stream processors - + 10
# Belirli bir consumer'ın pending mesajları
redis-cli XPENDING orders_stream processors - + 10 worker-1
Gerçek Dünya Senaryosu: E-Ticaret Sipariş İşleme
Diyelim ki bir e-ticaret platformu yönetiyorsun. Siparişler geldiğinde bunları birden fazla sisteme iletmen gerekiyor: envanter güncelleme, ödeme doğrulama, e-posta bildirimi, kargo sistemi. Her biri farklı hızlarda çalışıyor.
# Sipariş stream'i oluştur ve farklı işlemciler için gruplar kur
redis-cli XGROUP CREATE orders inventory_service $ MKSTREAM
redis-cli XGROUP CREATE orders payment_service $ MKSTREAM
redis-cli XGROUP CREATE orders notification_service $ MKSTREAM
# Yeni sipariş geldi
redis-cli XADD orders MAXLEN ~ 10000 *
order_id "ORD-2024-001"
customer_id "USR-456"
product_id "PRD-789"
quantity 2
total_amount 299.99
status "pending"
# Inventory service mesajı alıyor
redis-cli XREADGROUP GROUP orders inventory_service inv-worker-1
COUNT 10 BLOCK 1000 STREAMS orders >
# İşlem başarılı, onayla
redis-cli XACK orders inventory_service 1703001234567-0
# Payment service bağımsız olarak aynı mesajı alıyor
redis-cli XREADGROUP GROUP orders payment_service pay-worker-1
COUNT 10 BLOCK 1000 STREAMS orders >
Bu yapıda her servis kendi hızında çalışıyor. Notification service yavaş olsa bile inventory service’i etkilemiyor.
Dead Letter Queue ile Hata Yönetimi
Gerçek sistemlerde bazı mesajlar işlenemez. Belki veri formatı bozuk, belki bağımlı bir servis çökmüş. Bu durumda mesajı sonsuz döngüye sokmak yerine bir “dead letter queue”ya taşıman gerekiyor.
# Uzun süredir pending olan mesajları kontrol et (idle time ms cinsinden)
redis-cli XPENDING orders processors - + 10
# Mesajı başka bir consumer'a transfer et (60 saniyeden fazla bekliyorsa)
# XCLAIM komutu ile sahiplik değiştir
redis-cli XCLAIM orders processors worker-2 60000 1703001234567-0
# Minimum idle time ile otomatik claim (Redis 6.2+)
redis-cli XAUTOCLAIM orders processors worker-2 60000 0-0 COUNT 10
# Çok fazla deneme yapıldıysa dead letter queue'ya taşı
redis-cli XADD orders_failed MAXLEN ~ 1000 *
original_id "1703001234567-0"
error "max_retries_exceeded"
original_data "..."
failed_at "2024-01-15T10:30:00"
# Orijinal stream'den sil
redis-cli XDEL orders 1703001234567-0
Python ile Gerçek Zamanlı Consumer Uygulaması
Komut satırı örnekleri yetmez, gerçek bir uygulama nasıl yazılır ona bakalım. Aşağıdaki örnek production’da kullanabileceğin bir yapıyı gösteriyor:
# Önce gerekli kütüphaneyi kur
pip install redis
# Python consumer script'i (consumer_worker.py)
cat << 'EOF' > consumer_worker.py
import redis
import json
import time
import signal
import sys
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
STREAM_NAME = 'orders'
GROUP_NAME = 'processors'
CONSUMER_NAME = f'worker-{sys.argv[1] if len(sys.argv) > 1 else "1"}'
MAX_RETRIES = 3
running = True
def signal_handler(sig, frame):
global running
print(f"n{CONSUMER_NAME} durduruluyor...")
running = False
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def process_order(message_id, data):
"""Sipariş işleme mantığı"""
print(f"İşleniyor: {message_id} - Sipariş: {data.get('order_id')}")
# Gerçek işlem burada yapılır
time.sleep(0.1) # Simüle edilmiş işlem süresi
return True
def handle_pending_messages():
"""Onaylanmamış mesajları tekrar işle"""
pending = r.xpending_range(STREAM_NAME, GROUP_NAME, '-', '+', 10, CONSUMER_NAME)
for item in pending:
msg_id = item['message_id']
idle_time = item['time_since_delivered']
delivery_count = item['times_delivered']
if delivery_count >= MAX_RETRIES:
# Dead letter queue'ya taşı
messages = r.xrange(STREAM_NAME, msg_id, msg_id)
if messages:
r.xadd('orders_failed', {'original_id': msg_id,
'data': json.dumps(dict(messages[0][1])),
'reason': 'max_retries'})
r.xack(STREAM_NAME, GROUP_NAME, msg_id)
print(f"Dead letter: {msg_id}")
elif idle_time > 30000: # 30 saniyeden fazla bekliyorsa
r.xclaim(STREAM_NAME, GROUP_NAME, CONSUMER_NAME, 30000, [msg_id])
# Pending mesajları önce işle
handle_pending_messages()
print(f"{CONSUMER_NAME} başlatıldı, {STREAM_NAME} dinleniyor...")
while running:
try:
messages = r.xreadgroup(
GROUP_NAME, CONSUMER_NAME,
{STREAM_NAME: '>'},
count=10, block=2000
)
if not messages:
continue
for stream, msgs in messages:
for msg_id, data in msgs:
try:
if process_order(msg_id, data):
r.xack(STREAM_NAME, GROUP_NAME, msg_id)
print(f"Onaylandi: {msg_id}")
except Exception as e:
print(f"Hata: {msg_id} - {e}")
except redis.ConnectionError as e:
print(f"Baglanti hatasi: {e}")
time.sleep(5)
except Exception as e:
print(f"Beklenmeyen hata: {e}")
time.sleep(1)
print(f"{CONSUMER_NAME} durduruldu.")
EOF
# Worker'ı çalıştır
python3 consumer_worker.py 1 &
python3 consumer_worker.py 2 &
python3 consumer_worker.py 3 &
Monitoring ve İzleme
Production’da stream’lerin sağlığını izlemen şart. Aşağıdaki komutlar günlük monitoring rutininin parçası olmalı:
#!/bin/bash
# stream_monitor.sh - Redis Stream sağlık kontrol scripti
REDIS_CLI="redis-cli"
STREAM="orders"
GROUP="processors"
ALERT_THRESHOLD=1000 # 1000'den fazla pending varsa uyar
echo "=== Redis Stream Monitor ==="
echo "Tarih: $(date)"
echo ""
# Stream uzunluğu
STREAM_LEN=$($REDIS_CLI XLEN $STREAM)
echo "Stream uzunlugu: $STREAM_LEN mesaj"
# Consumer group durumu
echo ""
echo "=== Consumer Group Durumu ==="
$REDIS_CLI XINFO GROUPS $STREAM | grep -E "name|consumers|pending|last-delivered"
# Pending mesaj sayısı
PENDING_COUNT=$($REDIS_CLI XPENDING $STREAM $GROUP - + 1 | wc -l)
echo ""
echo "Pending mesaj sayisi: $PENDING_COUNT"
if [ "$PENDING_COUNT" -gt "$ALERT_THRESHOLD" ]; then
echo "UYARI: Cok fazla pending mesaj! ($PENDING_COUNT)"
# Buraya alerting mekanizman gelebilir
fi
# Her consumer'ın durumu
echo ""
echo "=== Consumer Detaylari ==="
$REDIS_CLI XINFO CONSUMERS $STREAM $GROUP
# Memory kullanımı
echo ""
echo "=== Memory Kullanimi ==="
$REDIS_CLI MEMORY USAGE $STREAM
# En eski pending mesaj
echo ""
echo "=== En Eski Pending Mesaj ==="
$REDIS_CLI XPENDING $STREAM $GROUP - + 1
# Script'i çalıştırılabilir yap ve cronjob ekle
chmod +x stream_monitor.sh
# Her 5 dakikada bir çalıştır
echo "*/5 * * * * /opt/scripts/stream_monitor.sh >> /var/log/redis-stream-monitor.log 2>&1" | crontab -
Performance Tuning ve Best Practices
Streams’i production’da kullanırken dikkat etmen gereken birkaç kritik nokta var:
MAXLEN kullanımını ihmal etme: Stream sonsuz büyüyebilir ve Redis memory’sini tüketir. Her XADD’e MAXLEN ~ 10000 eklemek alışkanlık haline getir.
Batch okuma yap: Her seferinde tek mesaj okumak yerine COUNT 100 ile toplu oku. Ağ gecikmesi azalır, throughput artar.
Uygun block timeout seç: BLOCK 0 sonsuz bekleme yapar ama bağlantı sorunlarında fark etmezsin. BLOCK 5000 gibi bir değer hem reaktif hem de hata toleranslı çalışmanı sağlar.
Consumer isimlerini unique yap: Aynı isimde iki consumer olursa mesajlar karışır. Hostname + PID kombinasyonu iyi bir strateji.
Trim stratejini belirle: Sadece MAXLEN değil, XAUTOCLAIM ile de düzenli temizlik yap. Eski pending mesajları takip et.
# Redis config optimizasyonları (redis.conf)
# Stream için özel ayarlar yok ama genel memory yönetimi kritik
# Maksimum memory sınırı
redis-cli CONFIG SET maxmemory 4gb
# Memory policy: streams için noeviction önerilir
# allkeys-lru yerine noeviction kullan, veri kaybetme
redis-cli CONFIG SET maxmemory-policy noeviction
# Stream entry sayısını sınırla (hash-max-listpack-entries ile benzer mantık)
redis-cli CONFIG SET stream-node-max-bytes 4096
redis-cli CONFIG SET stream-node-max-entries 100
# Persistence ayarları - stream verisi için AOF önerilir
redis-cli CONFIG SET appendonly yes
redis-cli CONFIG SET appendfsync everysec
IoT Sensör Verisi Senaryo: Gerçek Zamanlı Analiz
Fabrika ortamında yüzlerce sensörden saniyede binlerce veri geldiğini düşün. Bu veriyi toplamak, filtrelemek ve analize göndermek için Redis Streams mükemmel bir köprü görevi görüyor:
# Sensör verisi üretici simülasyonu
cat << 'SCRIPT' > sensor_producer.sh
#!/bin/bash
SENSORS=("temp-01" "temp-02" "pressure-01" "humidity-01" "vibration-01")
STREAM="factory_sensors"
# Stream'i temizle ve grupları oluştur
redis-cli XGROUP CREATE $STREAM anomaly_detector $ MKSTREAM 2>/dev/null
redis-cli XGROUP CREATE $STREAM data_archiver $ MKSTREAM 2>/dev/null
echo "Sensor verisi uretiliyor..."
while true; do
for sensor in "${SENSORS[@]}"; do
# Rastgele sensör değerleri üret
VALUE=$(awk 'BEGIN{srand(); printf "%.2f", 20 + rand() * 80}')
TIMESTAMP=$(date +%s%3N)
MSG_ID=$(redis-cli XADD $STREAM MAXLEN "~" 50000 "*"
sensor_id "$sensor"
value "$VALUE"
unit "celsius"
location "hall-A"
timestamp "$TIMESTAMP")
# Anomali tespiti için basit eşik kontrolü
if (( $(echo "$VALUE > 85" | bc -l) )); then
redis-cli XADD alerts MAXLEN "~" 1000 "*"
sensor_id "$sensor"
alert_type "high_temp"
value "$VALUE"
threshold 85
msg_ref "$MSG_ID" > /dev/null
echo "UYARI: $sensor - Yuksek sicaklik: $VALUE"
fi
done
sleep 0.1
done
SCRIPT
chmod +x sensor_producer.sh
# Arka planda çalıştır
./sensor_producer.sh &
# Belirli bir sensörün son 10 kaydını izle
redis-cli XREVRANGE factory_sensors + - COUNT 10
# Anomali alerts'i izle
redis-cli XREAD COUNT 5 BLOCK 1000 STREAMS alerts $
Sonuç
Redis Streams, mesaj kuyruğu ihtiyaçlarının büyük çoğunluğunu tek bir araçla çözüyor. Kafka veya RabbitMQ gibi sistemlerin kurulum ve yönetim karmaşıklığı olmadan, mevcut Redis altyapın üzerinde çalışıyor. Bu yazıda ele aldığımız konuları özetlersek:
- Consumer Groups ile paralel ve ölçeklenebilir mesaj tüketimi yapabiliyorsun
- XACK mekanizması sayesinde mesaj kayıpları yaşamadan “at-least-once” garantisi sağlıyorsun
- XCLAIM ve XAUTOCLAIM ile başarısız işlemleri otomatik olarak yeniden dağıtabiliyorsun
- MAXLEN ile memory kullanımını kontrol altında tutabiliyorsun
- Monitoring scriptleri ile sistemin sağlığını proaktif takip edebiliyorsun
Özellikle şu durumlarda Redis Streams’i tercih etmeni öneririm: Ekibin Kafka yönetme kapasitesine sahip değilse, saniyede onbinlerce mesaj işlemenin gerekmediği orta ölçekli sistemlerde ve zaten Redis kullanan bir altyapın varsa. 100K+ messages/saniye gibi extreme throughput ihtiyaçlarında tabii ki Kafka daha uygun olacak.
Production’a geçmeden önce mutlaka yük testi yap, MAXLEN değerlerini dikkatli belirle ve dead letter queue mekanizmanı test et. Pending mesajları düzenli temizlemezsen zamanla hem memory hem de performans sorunları yaşarsın. Monitoring scriptini cronjob olarak ekle, alert threshold’larını sistemine göre ayarla.
Redis 7.x ile gelen iyileştirmeler de göz önüne alındığında Streams her sürümde daha olgunlaşıyor. Özellikle büyük ekiplerde geliştirici deneyimi açısından da çok daha kolay entegre edilebilir bir yapıya sahip. Deneyin, production’a taşıyın, iyi şanslar.