Redis Queue (RQ) ile Basit İş Kuyruğu Yönetimi
Prodüksiyonda bir e-ticaret sitesi yönetirken en sinir bozucu şey nedir biliyor musunuz? Kullanıcı “Sipariş Ver” butonuna tıklar, sistem fatura oluşturmaya, e-posta göndermeye, stok güncellemeye çalışır ve tam bu sırada sayfa donup kalır. Müşteri sayfayı yeniler, sipariş iki kez oluşur, kargo müdürü sizi arar. Bu senaryoyu yaşadım ve çözüm basitti: iş kuyruğu. Bugün Python ekosisteminin en temiz iş kuyruğu çözümlerinden biri olan Redis Queue (RQ) hakkında konuşacağız.
Redis Queue Nedir, Neden Kullanmalıyız?
RQ, Python için geliştirilmiş, Redis üzerine kurulu hafif bir iş kuyruğu kütüphanesidir. Celery kadar karmaşık değildir, ama küçük ve orta ölçekli projelerde Celery’nin sunduğu şeylerin yüzde seksenini çok daha az konfigürasyonla yapabilirsiniz.
Temel mantık şu: Web uygulamanız uzun süren bir işi (e-posta gönderme, PDF oluşturma, API çağrısı, video işleme) hemen yapmak yerine bu işi bir kuyruğa ekler. Arka planda çalışan worker süreçleri bu kuyruğu dinler ve işleri sırayla çalıştırır. Kullanıcı yanıtını anında alır, ağır iş arka planda hallolur.
RQ’nun öne çıkan özellikleri:
- Sıfır konfigürasyon karmaşıklığı: Broker olarak sadece Redis lazım, başka hiçbir şey.
- İş önceliklendirme: Farklı kuyruklar tanımlayarak yüksek ve düşük öncelikli işleri ayırabilirsiniz.
- Başarısız iş yönetimi: Hata veren işler
failedkuyruğuna düşer, incelenebilir ve yeniden çalıştırılabilir. - İş durumu takibi: Her işin durumunu (
queued,started,finished,failed) anlık olarak sorgulayabilirsiniz. - Dashboard desteği:
rq-dashboardile tarayıcı üzerinden görsel izleme yapılabilir.
Ortam Kurulumu
Önce Redis’in çalıştığından emin olalım. Sunucunuzda Redis yoksa:
# Ubuntu/Debian
sudo apt update && sudo apt install redis-server -y
sudo systemctl enable redis-server
sudo systemctl start redis-server
# Redis çalışıyor mu kontrol edelim
redis-cli ping
# PONG dönmeli
Python tarafında ihtiyacımız olanları yükleyelim:
# Sanal ortam oluşturmak iyi pratik
python3 -m venv rq_env
source rq_env/bin/activate
# RQ ve bağımlılıkları
pip install rq redis rq-dashboard
# Flask kullanacaksak (örnek için)
pip install flask
Proje yapısını şöyle düzenleyelim:
mkdir rq_demo && cd rq_demo
touch app.py tasks.py worker.py requirements.txt
İlk İş Fonksiyonunu Yazmak
RQ’da bir “iş” aslında sıradan bir Python fonksiyonudur. Özel bir dekoratöre veya sınıfa ihtiyacınız yok. Bu, RQ’nun en sevdiğim özelliği.
tasks.py dosyamıza gerçekçi örnekler yazalım:
# tasks.py
import time
import smtplib
import logging
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def send_welcome_email(user_email: str, username: str) -> dict:
"""Yeni kayıt olan kullanıcıya hoşgeldin maili gönderir."""
logger.info(f"E-posta gönderiliyor: {user_email}")
# Gerçek projede SMTP ayarlarınız config'den gelir
# Burada simüle ediyoruz
time.sleep(2) # SMTP bağlantısı simülasyonu
# Gerçek SMTP kodu şöyle olurdu:
# msg = MIMEMultipart()
# msg['From'] = '[email protected]'
# msg['To'] = user_email
# msg['Subject'] = f'Hoşgeldin {username}!'
# ...
logger.info(f"E-posta başarıyla gönderildi: {user_email}")
return {
"status": "success",
"recipient": user_email,
"username": username
}
def generate_monthly_report(month: int, year: int) -> str:
"""Aylık rapor oluşturur - uzun süren bir işlem."""
logger.info(f"Rapor oluşturuluyor: {month}/{year}")
# Veri tabanı sorguları, hesaplamalar vs. simülasyonu
for i in range(5):
time.sleep(1)
logger.info(f"Rapor ilerleme: %{(i+1)*20}")
rapor_yolu = f"/tmp/rapor_{year}_{month:02d}.pdf"
logger.info(f"Rapor tamamlandı: {rapor_yolu}")
return rapor_yolu
def process_image(image_path: str, width: int, height: int) -> dict:
"""Görsel yeniden boyutlandırma işlemi."""
logger.info(f"Görsel işleniyor: {image_path}")
time.sleep(3) # Görsel işleme simülasyonu
output_path = image_path.replace('.jpg', f'_{width}x{height}.jpg')
return {
"original": image_path,
"processed": output_path,
"dimensions": f"{width}x{height}"
}
def update_search_index(product_id: int) -> bool:
"""Ürün arama indeksini günceller."""
logger.info(f"Arama indeksi güncelleniyor: ürün #{product_id}")
time.sleep(1)
return True
Worker Süreci
Worker, kuyruğu dinleyen ve işleri çalıştıran süreçtir. Basit bir worker.py oluşturalım:
# worker.py
import os
from redis import Redis
from rq import Worker, Queue, Connection
# Redis bağlantısı
redis_conn = Redis(
host=os.getenv('REDIS_HOST', 'localhost'),
port=int(os.getenv('REDIS_PORT', 6379)),
db=int(os.getenv('REDIS_DB', 0)),
password=os.getenv('REDIS_PASSWORD', None)
)
# Dinlenecek kuyruklar (öncelik sırasıyla)
listen = ['high', 'default', 'low']
if __name__ == '__main__':
with Connection(redis_conn):
worker = Worker(map(Queue, listen))
worker.work()
Worker’ı başlatalım:
# Geliştirme ortamında
python worker.py
# Prodüksiyonda arka planda çalıştırmak için
nohup python worker.py > /var/log/rq_worker.log 2>&1 &
# Ya da systemd ile (tavsiye edilir, aşağıda anlatıyorum)
Flask ile Entegrasyon: Gerçek Dünya Senaryosu
E-ticaret örneğimize dönelim. Kullanıcı sipariş verdiğinde hangi işleri kuyruğa almalıyız?
# app.py
from flask import Flask, jsonify, request
from redis import Redis
from rq import Queue
from rq.job import Job
import uuid
from tasks import (
send_welcome_email,
generate_monthly_report,
process_image,
update_search_index
)
app = Flask(__name__)
# Redis ve kuyruk bağlantıları
redis_conn = Redis(host='localhost', port=6379)
# Farklı öncelik seviyeleri için ayrı kuyruklar
high_queue = Queue('high', connection=redis_conn)
default_queue = Queue('default', connection=redis_conn)
low_queue = Queue('low', connection=redis_conn)
@app.route('/api/register', methods=['POST'])
def user_register():
"""Kullanıcı kaydı - hoşgeldin maili kuyruğa alınır."""
data = request.json
username = data.get('username')
email = data.get('email')
# Kullanıcıyı veritabanına kaydet (simülasyon)
user_id = str(uuid.uuid4())[:8]
# Hoşgeldin mailini kuyruğa ekle - kullanıcı beklemek zorunda değil
job = default_queue.enqueue(
send_welcome_email,
email,
username,
job_timeout=300, # 5 dakika timeout
result_ttl=86400, # Sonucu 24 saat sakla
failure_ttl=604800 # Hata kaydını 7 gün sakla
)
return jsonify({
"message": "Kayıt başarılı! Hoşgeldin maili gönderilecek.",
"user_id": user_id,
"email_job_id": job.id
}), 201
@app.route('/api/reports/monthly', methods=['POST'])
def create_monthly_report():
"""Aylık rapor oluşturma - düşük öncelikli."""
data = request.json
month = data.get('month')
year = data.get('year')
# Düşük öncelikli kuyruğa ekle, sistem meşgul olmadığında çalışsın
job = low_queue.enqueue(
generate_monthly_report,
month,
year,
job_timeout=1800 # 30 dakika timeout - rapor uzun sürebilir
)
return jsonify({
"message": "Rapor oluşturma başlatıldı.",
"job_id": job.id,
"status_url": f"/api/jobs/{job.id}"
}), 202
@app.route('/api/products/<int:product_id>/image', methods=['POST'])
def upload_product_image(product_id):
"""Ürün görseli yükleme - yüksek öncelikli."""
image_path = f"/uploads/products/{product_id}_original.jpg"
# Yüksek öncelikli kuyruğa ekle - ürün görseli önemli
job = high_queue.enqueue(
process_image,
image_path,
800,
600,
job_timeout=120
)
# Arama indeksini de güncelle - varsayılan öncelik
index_job = default_queue.enqueue(
update_search_index,
product_id,
depends_on=job # Görsel işlendikten sonra çalışsın
)
return jsonify({
"message": "Görsel yüklendi, işleniyor.",
"image_job_id": job.id,
"index_job_id": index_job.id
}), 202
@app.route('/api/jobs/<job_id>', methods=['GET'])
def get_job_status(job_id):
"""İş durumunu sorgula."""
try:
job = Job.fetch(job_id, connection=redis_conn)
response = {
"job_id": job.id,
"status": job.get_status(),
"created_at": str(job.created_at),
"enqueued_at": str(job.enqueued_at),
}
if job.is_finished:
response["result"] = job.result
response["ended_at"] = str(job.ended_at)
elif job.is_failed:
response["error"] = str(job.exc_info)
return jsonify(response)
except Exception as e:
return jsonify({"error": f"İş bulunamadı: {job_id}"}), 404
if __name__ == '__main__':
app.run(debug=True, port=5000)
İş Bağımlılıkları ve Zincirleme
Yukarıdaki örnekte depends_on parametresini kullandım. Bu çok güçlü bir özellik. Şimdi daha karmaşık bir senaryo düşünelim: Kullanıcı bir video yüklüyor, önce video işlenmeli, sonra küçük resim oluşturulmalı, sonra bildirim gönderilmeli.
# tasks.py dosyasına ekleyin
def transcode_video(video_path: str) -> str:
"""Video dönüştürme işlemi."""
logger.info(f"Video dönüştürülüyor: {video_path}")
time.sleep(10)
output = video_path.replace('.mp4', '_720p.mp4')
logger.info(f"Video hazır: {output}")
return output
def create_thumbnail(video_path: str) -> str:
"""Video küçük resmi oluşturma."""
logger.info(f"Küçük resim oluşturuluyor: {video_path}")
time.sleep(2)
thumbnail = video_path.replace('.mp4', '_thumb.jpg')
return thumbnail
def notify_user(user_id: int, message: str) -> bool:
"""Kullanıcıya bildirim gönder."""
logger.info(f"Bildirim gönderiliyor: kullanıcı {user_id}")
time.sleep(1)
return True
# Zincirleme iş örneği - app.py'e ekleyin
@app.route('/api/videos/upload', methods=['POST'])
def upload_video():
data = request.json
video_path = data.get('path')
user_id = data.get('user_id')
# Zincirleme: transcode -> thumbnail -> notify
transcode_job = high_queue.enqueue(
'tasks.transcode_video',
video_path,
job_timeout=600
)
thumb_job = default_queue.enqueue(
'tasks.create_thumbnail',
video_path,
depends_on=transcode_job,
job_timeout=60
)
notify_job = default_queue.enqueue(
'tasks.notify_user',
user_id,
"Videonuz işlendi!",
depends_on=thumb_job,
job_timeout=30
)
return jsonify({
"pipeline": {
"transcode": transcode_job.id,
"thumbnail": thumb_job.id,
"notify": notify_job.id
}
}), 202
Başarısız İşleri Yönetmek
Prodüksiyonda işler başarısız olur. Bu kaçınılmaz. RQ’nun başarısız iş yönetimi oldukça kullanışlı.
# failed_jobs_manager.py
from redis import Redis
from rq import Queue
from rq.job import Job
from rq.registry import FailedJobRegistry, StartedJobRegistry
import json
redis_conn = Redis(host='localhost', port=6379)
def list_failed_jobs(queue_name='default'):
"""Başarısız işleri listele."""
queue = Queue(queue_name, connection=redis_conn)
registry = FailedJobRegistry(queue=queue)
failed_jobs = []
for job_id in registry.get_job_ids():
job = Job.fetch(job_id, connection=redis_conn)
failed_jobs.append({
"id": job.id,
"func": job.func_name,
"args": str(job.args),
"error": str(job.exc_info)[:200] if job.exc_info else None,
"failed_at": str(job.ended_at)
})
return failed_jobs
def retry_failed_job(job_id: str, queue_name='default'):
"""Başarısız bir işi yeniden kuyruğa ekle."""
queue = Queue(queue_name, connection=redis_conn)
registry = FailedJobRegistry(queue=queue)
job = Job.fetch(job_id, connection=redis_conn)
# Başarısız kayıttan kaldır
registry.remove(job)
# Aynı parametrelerle yeniden kuyruğa ekle
new_job = queue.enqueue_job(job)
print(f"İş yeniden kuyruğa alındı. Yeni ID: {new_job.id}")
return new_job.id
def clear_failed_jobs(queue_name='default'):
"""Tüm başarısız işleri temizle."""
queue = Queue(queue_name, connection=redis_conn)
registry = FailedJobRegistry(queue=queue)
count = 0
for job_id in registry.get_job_ids():
job = Job.fetch(job_id, connection=redis_conn)
job.delete()
count += 1
print(f"{count} başarısız iş temizlendi.")
return count
if __name__ == '__main__':
print("Başarısız işler:")
for job in list_failed_jobs():
print(json.dumps(job, ensure_ascii=False, indent=2))
Komut satırından da başarısız işleri yönetebilirsiniz:
# Kuyruk durumunu görmek için
rq info
# Belirli bir kuyruğu izlemek için
rq info default high low
# Tüm worker'ları ve durumlarını görmek için
rq info --interval 1 # Her saniye güncelle
Systemd ile Prodüksiyon Kurulumu
Geliştirme ortamında worker’ı elle başlatıyoruz ama prodüksiyonda systemd servisi şart. Birden fazla worker çalıştırmak için template servis kullanalım:
# /etc/systemd/system/[email protected]
sudo tee /etc/systemd/system/[email protected] << 'EOF'
[Unit]
Description=RQ Worker Instance %i
After=network.target redis.service
Requires=redis.service
[Service]
Type=simple
User=www-data
Group=www-data
WorkingDirectory=/var/www/myapp
Environment="PATH=/var/www/myapp/rq_env/bin"
Environment="REDIS_HOST=localhost"
Environment="REDIS_PORT=6379"
ExecStart=/var/www/myapp/rq_env/bin/python worker.py
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
SyslogIdentifier=rq-worker-%i
[Install]
WantedBy=multi-user.target
EOF
# 3 worker instance başlat
sudo systemctl daemon-reload
sudo systemctl enable rq-worker@1 rq-worker@2 rq-worker@3
sudo systemctl start rq-worker@1 rq-worker@2 rq-worker@3
# Durumları kontrol et
sudo systemctl status rq-worker@*
# Log takibi
sudo journalctl -u rq-worker@1 -f
RQ Dashboard ile İzleme
rq-dashboard kuruluydu, kullanalım:
# Basit başlatma
rq-dashboard --redis-host localhost --redis-port 6379
# Belirli bir portta
rq-dashboard -p 9181
# Nginx arkasında çalışıyorsa prefix ile
rq-dashboard --url-prefix /rqd
Dashboard için basit bir Nginx konfigürasyonu:
# /etc/nginx/sites-available/rq-dashboard
server {
listen 80;
server_name monitoring.siteniz.com;
# Temel kimlik doğrulama - dışarıya açmayın!
auth_basic "RQ Dashboard";
auth_basic_user_file /etc/nginx/.htpasswd;
location / {
proxy_pass http://127.0.0.1:9181;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
}
Dashboard’a dışarıdan erişim açıyorsanız mutlaka kimlik doğrulama ekleyin. Bunu atlayanları gördüm, Redis verilerine herkes erişebilir hale geldi.
Zamanlı İşler: RQ Scheduler
Belirli aralıklarla çalışması gereken işler için rq-scheduler kullanabiliriz:
pip install rq-scheduler
# scheduler_setup.py
from redis import Redis
from rq import Queue
from rq_scheduler import Scheduler
from datetime import datetime, timedelta
import pytz
redis_conn = Redis(host='localhost', port=6379)
scheduler = Scheduler(connection=redis_conn)
# Mevcut zamanlanmış işleri temizle (yeniden başlatmada duplikasyon önlemek için)
for job in scheduler.get_jobs():
scheduler.cancel(job)
# Her gece 02:00'de aylık rapor oluştur (sadece ay başlarında)
turkey_tz = pytz.timezone('Europe/Istanbul')
scheduler.cron(
"0 2 1 * *", # Her ayın 1'inde saat 02:00
func='tasks.generate_monthly_report',
args=[datetime.now().month, datetime.now().year],
queue_name='low',
use_local_timezone=True
)
# Her 30 dakikada bir önbelleği güncelle
scheduler.schedule(
scheduled_time=datetime.utcnow(),
func='tasks.update_search_index',
args=[0],
interval=1800, # 30 dakika = 1800 saniye
repeat=None, # Sonsuza kadar tekrar et
queue_name='low'
)
print("Zamanlanmış işler ayarlandı.")
# Scheduler'ı başlat
rqscheduler --host localhost --port 6379
# Systemd servisi olarak
sudo tee /etc/systemd/system/rq-scheduler.service << 'EOF'
[Unit]
Description=RQ Scheduler
After=network.target redis.service
[Service]
Type=simple
User=www-data
WorkingDirectory=/var/www/myapp
Environment="PATH=/var/www/myapp/rq_env/bin"
ExecStart=/var/www/myapp/rq_env/bin/rqscheduler --host localhost --port 6379
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
EOF
Pratik İpuçları ve Dikkat Edilecek Noktalar
Yıllar içinde RQ kullanırken öğrendiğim bazı kritik noktalar:
İş fonksiyonları her zaman import edilebilir olmalı. RQ, fonksiyonu string olarak saklar ve worker tarafında import eder. Fonksiyonunuz lambda olamaz, sınıf metodları dikkatli kullanılmalı.
Büyük veri iş argümanları olmamalı. Bir müşteri listesini (10.000 satır) job argümanı olarak geçirmeyin. Bunun yerine veritabanı filtresini geçirin, worker kendi çeksin.
Timeout’ları gerçekçi ayarlayın. Varsayılan 180 saniyedir. Video işleme veya büyük rapor işleri için bu yeterli olmaz.
Her worker ayrı bir veritabanı bağlantısı açar. 10 worker çalıştırıyorsanız connection pool buna göre ayarlanmalı.
Başarısız iş TTL’ini unutmayın. failure_ttl ayarlanmazsa başarısız işler Redis’te sonsuza kadar kalır ve bellek dolabilir.
Redis bağlantı kesilmelerini yönetin. Worker’lara --burst modu yerine systemd restart politikası daha güvenli.
Sonuç
RQ, Python projeleri için iş kuyruğu ihtiyacını karşılamanın en az sürtünmeli yollarından biri. Celery ile kıyaslandığında kurulum ve bakım maliyeti çok daha düşük. Küçük bir startup’tan günde milyonlarca işlem yapan sistemlere kadar RQ kullanan örnekler var.
Benim önerim şu: Eğer projenizde asenkron iş işleme ihtiyacı varsa ve Celery’nin karmaşıklığından kaçınmak istiyorsanız, RQ ile başlayın. Ölçeklenme ihtiyacı çok ciddi bir noktaya gelirse Celery’ye geçiş her zaman mümkün, ama çoğu proje RQ ile sonuna kadar götürülebilir.
Prodüksiyon öncesinde dikkat edilmesi gereken kontrol listesi:
- Redis için kalıcı depolama (AOF veya RDB) yapılandırıldı mı?
- Worker sayısı iş yüküne göre ayarlandı mı?
- Başarısız iş bildirimleri (Sentry, e-posta) kurulu mu?
- Kuyruk uzunluğu için alarm mekanizması var mı?
- Worker’lar systemd ile otomatik yeniden başlatılıyor mu?
- Dashboard kimlik doğrulama ile korunuyor mu?
Bu adımları tamamladıktan sonra çalışan, güvenilir bir iş kuyruğu sisteminiz olacak. Kullanıcılarınız artık sipariş verdikten sonra sayfa donmasını beklemeyecek.
