Celery Kurulumu: RabbitMQ ve Redis ile Entegrasyon

Dağıtık sistemlerle ciddi anlamda ilk kez boğuştuğumda, bir e-ticaret projesinde kullanıcılar sipariş verdiğinde fatura PDF’i oluşturma işlemi tam 8-12 saniye sürüyordu. Kullanıcı o süre boyunca beyaz ekrana bakıyordu. Celery’yi devreye aldığımızda bu iş arka plana düştü, kullanıcı anında “Siparişiniz alındı” mesajını gördü, PDF de 10 saniye sonra e-postasına düştü. İşte asenkron görev yönetiminin özü bu.

Bu yazıda Celery’yi hem RabbitMQ hem de Redis broker’ı ile nasıl kurup yapılandıracağınızı, production ortamında nelere dikkat etmeniz gerektiğini ve ikisi arasındaki tercih noktalarını gerçek dünya perspektifiyle anlatacağım.

Celery Nedir ve Neden Lazım?

Celery, Python tabanlı dağıtık bir görev kuyruğu sistemidir. Bir broker (mesaj kuyruğu) aracılığıyla görevleri alır, worker süreçleri üzerinde çalıştırır ve isteğe bağlı olarak sonuçları bir backend’e yazar.

Şu senaryoları düşünün:

  • Kullanıcı kayıt olduğunda hoş geldin e-postası gönderme
  • Büyük CSV dosyalarını parse edip veritabanına yazma
  • Gecede bir çalışan rapor oluşturma görevleri
  • Ödeme işlemi sonrası ERP sistemine bildirim gönderme
  • Görüntü yeniden boyutlandırma ve watermark ekleme

Bunların hepsini HTTP request-response döngüsü içinde yapmak hem kullanıcı deneyimini mahveder hem de web sunucunuzu gereksiz yere meşgul eder. Celery bu işleri queue’ya alır, istediğiniz zamanda, istediğiniz kapasite ile işler.

Celery’nin temel bileşenleri şunlardır:

  • Broker: Görevlerin gönderildiği ve bekletildiği kuyruk sistemi (RabbitMQ, Redis)
  • Worker: Görevleri kuyruktan alıp çalıştıran süreç
  • Backend: Görev sonuçlarının saklandığı yer (Redis, veritabanı)
  • Beat: Zamanlanmış görevleri tetikleyen scheduler

Ortam Hazırlığı

Ben genellikle bütün bu servisleri önce tek bir sunucuda ayağa kaldırıp test ederim, sonra production’da ayırdığım ayrı instance’lara taşırım. Şimdi de bu sırayı izleyeceğiz.

Önce Python virtual environment oluşturup temel paketleri kuralım:

python3 -m venv celery_env
source celery_env/bin/activate

pip install celery
pip install "celery[redis]"      # Redis broker için
pip install "celery[rabbitmq]"   # RabbitMQ için (amqp)
pip install flower               # Monitoring için
pip install redis                # Redis Python client
pip install pika                 # RabbitMQ Python client (opsiyonel, test için)

Versiyon bilgilerini kontrol edelim:

celery --version
python -c "import celery; print(celery.__version__)"

RabbitMQ Kurulumu ve Yapılandırması

RabbitMQ, AMQP protokolünü kullanan, enterprise düzeyde bir message broker. Celery’nin orijinal ve en olgun entegrasyonu RabbitMQ üzerinedir. Karmaşık routing senaryolarında, görev önceliklendirmesinde ve mesaj kalıcılığı kritik olduğunda RabbitMQ’yu tercih ederim.

Ubuntu/Debian Üzerinde RabbitMQ Kurulumu

# Erlang'ı kurun (RabbitMQ buna bağımlı)
apt-get install -y erlang-base erlang-asn1 erlang-crypto erlang-eldap 
    erlang-ftp erlang-inets erlang-mnesia erlang-os-mon 
    erlang-parsetools erlang-public-key erlang-runtime-tools 
    erlang-snmp erlang-ssl erlang-syntax-tools erlang-tftp 
    erlang-tools erlang-xmerl

# RabbitMQ repository ekleyin
curl -1sLf 'https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey' | apt-key add -
add-apt-repository 'deb https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu/ focal main'

apt-get update
apt-get install -y rabbitmq-server

# Servisi başlatın ve enable edin
systemctl start rabbitmq-server
systemctl enable rabbitmq-server
systemctl status rabbitmq-server

RabbitMQ Yönetim Paneli ve Kullanıcı Yapılandırması

# Management plugin'i aktif edin
rabbitmq-plugins enable rabbitmq_management

# Default guest kullanıcısını silmek iyi pratiktir
rabbitmqctl delete_user guest

# Yeni kullanıcı oluşturun
rabbitmqctl add_user celery_user GucluBirSifre123!
rabbitmqctl set_user_tags celery_user administrator

# Celery için özel bir vhost oluşturun
rabbitmqctl add_vhost celery_vhost

# Kullanıcıya bu vhost üzerinde tam yetki verin
rabbitmqctl set_permissions -p celery_vhost celery_user ".*" ".*" ".*"

# Durumu kontrol edin
rabbitmqctl list_users
rabbitmqctl list_vhosts
rabbitmqctl list_permissions -p celery_vhost

Management UI’a http://sunucu_ip:15672 adresinden erişebilirsiniz. Kuyruk durumlarını, mesaj sayılarını ve worker bağlantılarını buradan canlı olarak izleyebilirsiniz.

Redis Kurulumu ve Yapılandırması

Redis, hem broker hem de result backend olarak kullanılabilen, kurulumu son derece basit olan bir çözüm. Ekibinizde zaten Redis varsa (ki büyük ihtimalle vardır, caching için kullanılıyordur), Celery için ayrı bir servis kurmak zorunda kalmadan hızlıca başlayabilirsiniz.

# Ubuntu/Debian
apt-get update
apt-get install -y redis-server

# Temel güvenlik yapılandırması
cat >> /etc/redis/redis.conf << 'EOF'

# Bind sadece localhost'a (production'da gerekirse değiştirin)
bind 127.0.0.1

# Şifre belirleyin
requirepass RedisGucluSifre456!

# Celery için önerilen ayarlar
maxmemory 512mb
maxmemory-policy allkeys-lru

# Persistence ayarları (broker olarak kullanıyorsanız önemli)
appendonly yes
appendfsync everysec
EOF

systemctl restart redis-server
systemctl enable redis-server

# Test edin
redis-cli -a RedisGucluSifre456! ping

Celery Uygulama Yapılandırması

Şimdi asıl Celery yapılandırmasına gelelim. İki broker için de çalışan bir yapı kuracağız.

Temel Proje Yapısı

mkdir celery_project
cd celery_project

touch celery_app.py
touch tasks.py
touch celery_config.py

celery_config.py – Yapılandırma Dosyası

# celery_config.py

import os

# ============================================================
# BROKER AYARLARI - birini seçin
# ============================================================

# RabbitMQ için:
BROKER_URL_RABBITMQ = 'amqp://celery_user:GucluBirSifre123!@localhost:5672/celery_vhost'

# Redis için:
BROKER_URL_REDIS = 'redis://:RedisGucluSifre456!@localhost:6379/0'

# Environment variable'dan al (production'da böyle yapın)
broker_url = os.environ.get('CELERY_BROKER_URL', BROKER_URL_REDIS)

# ============================================================
# RESULT BACKEND
# ============================================================
# Görev sonuçlarını her zaman Redis'te tutmak mantıklı
result_backend = os.environ.get(
    'CELERY_RESULT_BACKEND',
    'redis://:RedisGucluSifre456!@localhost:6379/1'
)

# ============================================================
# TEMEL AYARLAR
# ============================================================
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Istanbul'
enable_utc = True

# Görev sonuçları ne kadar süre saklanacak (saniye)
result_expires = 3600  # 1 saat

# Worker başına maksimum eşzamanlı görev
worker_concurrency = 4

# Her worker kaç görev aldıktan sonra yeniden başlasın
# Bellek sızıntılarına karşı önemli bir ayar
worker_max_tasks_per_child = 1000

# Görev zaman aşımı (saniye)
task_soft_time_limit = 300   # 5 dakika - SoftTimeLimitExceeded fırlatır
task_time_limit = 360        # 6 dakika - Süreci öldürür

# ============================================================
# KUYRUK TANIMLAMALARI
# ============================================================
from kombu import Queue, Exchange

# Exchange tanımları
default_exchange = Exchange('default', type='direct')
priority_exchange = Exchange('priority', type='direct')

task_queues = (
    Queue('default', default_exchange, routing_key='default'),
    Queue('email', default_exchange, routing_key='email'),
    Queue('reports', default_exchange, routing_key='reports'),
    Queue('critical', priority_exchange, routing_key='critical'),
)

task_default_queue = 'default'
task_default_exchange = 'default'
task_default_routing_key = 'default'

# ============================================================
# GÖREV YÖNLENDIRME
# ============================================================
task_routes = {
    'tasks.send_email': {'queue': 'email'},
    'tasks.generate_report': {'queue': 'reports'},
    'tasks.process_payment': {'queue': 'critical'},
}

celery_app.py – Celery Instance

# celery_app.py

from celery import Celery

app = Celery('celery_project')

# Config dosyasından ayarları yükle
app.config_from_object('celery_config')

# Task modüllerini otomatik keşfet
app.autodiscover_tasks(['tasks'])

if __name__ == '__main__':
    app.start()

tasks.py – Örnek Görevler

# tasks.py

import time
import logging
from celery import shared_task
from celery.exceptions import SoftTimeLimitExceeded
from celery_app import app

logger = logging.getLogger(__name__)


@app.task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,
    queue='email'
)
def send_email(self, recipient: str, subject: str, body: str):
    """
    E-posta gönderme görevi.
    Başarısız olursa 3 kez 60 saniye aralıkla yeniden dener.
    """
    try:
        logger.info(f"E-posta gönderiliyor: {recipient}")
        
        # Gerçekte burada smtplib veya sendgrid kullanırsınız
        # Simüle ediyoruz:
        time.sleep(2)
        
        if recipient == "[email protected]":
            raise ConnectionError("SMTP bağlantı hatası")
        
        logger.info(f"E-posta başarıyla gönderildi: {recipient}")
        return {"status": "success", "recipient": recipient}
        
    except ConnectionError as exc:
        logger.warning(f"E-posta gönderilemedi, yeniden denenecek: {exc}")
        raise self.retry(exc=exc)


@app.task(
    bind=True,
    queue='reports',
    soft_time_limit=240,
    time_limit=300
)
def generate_report(self, report_type: str, date_range: dict):
    """
    Büyük veri raporları oluşturma görevi.
    Zaman aşımı koruması ile.
    """
    try:
        logger.info(f"Rapor oluşturuluyor: {report_type}")
        
        # Uzun süren bir işi simüle ediyoruz
        for i in range(10):
            time.sleep(1)
            # İlerleme durumunu güncelle
            self.update_state(
                state='PROGRESS',
                meta={'current': i + 1, 'total': 10, 'percent': (i + 1) * 10}
            )
        
        return {
            "status": "completed",
            "report_type": report_type,
            "rows": 15420
        }
        
    except SoftTimeLimitExceeded:
        logger.error("Rapor oluşturma zaman aşımına uğradı")
        raise


@app.task(queue='critical', priority=9)
def process_payment(order_id: int, amount: float):
    """
    Ödeme işleme görevi - yüksek öncelikli kuyrukta.
    """
    logger.info(f"Ödeme işleniyor: Order #{order_id}, Tutar: {amount} TL")
    
    # Ödeme gateway'e istek simülasyonu
    time.sleep(1)
    
    return {
        "order_id": order_id,
        "amount": amount,
        "transaction_id": f"TXN{order_id}12345",
        "status": "approved"
    }

Worker’ları Çalıştırma

# Tüm queue'ları dinleyen tek worker
celery -A celery_app worker --loglevel=info

# Sadece belirli queue'ları dinleyen worker
celery -A celery_app worker --loglevel=info -Q email,default

# Sadece kritik işler için ayrı worker
celery -A celery_app worker --loglevel=info -Q critical --concurrency=8

# Worker'ı detach modunda çalıştır (daemon)
celery -A celery_app worker --loglevel=info 
    --logfile=/var/log/celery/worker.log 
    --pidfile=/var/run/celery/worker.pid 
    --detach

Systemd ile Production Kurulumu

Development’ta terminal’de çalıştırmak yeterli ama production’da Celery worker’larının sistem servisi olarak yönetilmesi şart.

# /etc/systemd/system/celery-worker.service

cat > /etc/systemd/system/celery-worker.service << 'EOF'
[Unit]
Description=Celery Worker Service
After=network.target rabbitmq-server.service redis-server.service

[Service]
Type=forking
User=www-data
Group=www-data
WorkingDirectory=/opt/celery_project
Environment="PATH=/opt/celery_project/celery_env/bin"
Environment="CELERY_BROKER_URL=amqp://celery_user:GucluBirSifre123!@localhost:5672/celery_vhost"
Environment="CELERY_RESULT_BACKEND=redis://:RedisGucluSifre456!@localhost:6379/1"

ExecStart=/opt/celery_project/celery_env/bin/celery 
    -A celery_app worker 
    --loglevel=info 
    --logfile=/var/log/celery/worker.log 
    --pidfile=/var/run/celery/worker.pid 
    --concurrency=4 
    --queues=default,email,reports,critical 
    --detach

ExecStop=/opt/celery_project/celery_env/bin/celery 
    -A celery_app control shutdown

ExecReload=/bin/kill -s HUP $MAINPID

RuntimeDirectory=celery
RuntimeDirectoryMode=0755

Restart=always
RestartSec=10s

[Install]
WantedBy=multi-user.target
EOF

# Log dizinini oluşturun
mkdir -p /var/log/celery
chown www-data:www-data /var/log/celery

systemctl daemon-reload
systemctl enable celery-worker
systemctl start celery-worker
systemctl status celery-worker

Flower ile İzleme

Flower, Celery için gerçek zamanlı web tabanlı monitoring aracı. Production’da Flower olmadan çalışmak kör uçuşa benzer.

# Flower'ı başlatın
celery -A celery_app flower 
    --port=5555 
    --broker=amqp://celery_user:GucluBirSifre123!@localhost:5672/celery_vhost 
    --basic_auth=admin:FlowerSifresi 
    --persistent=True 
    --db=/var/lib/flower/flower.db

# Ya da Redis broker ile
celery -A celery_app flower 
    --port=5555 
    --broker=redis://:RedisGucluSifre456!@localhost:6379/0 
    --basic_auth=admin:FlowerSifresi

Flower arayüzünde şunları görebilirsiniz:

  • Aktif, tamamlanan ve başarısız görevler
  • Worker durumları ve işlemci/bellek kullanımı
  • Queue derinlikleri ve throughput metrikleri
  • Görev detayları ve argümanları

RabbitMQ mu, Redis mi?

Bu soru her projede karşıma çıkar. Kısa cevap: ikisi de işe yarar, ama farklı senaryolarda farklı avantajları var.

RabbitMQ’yu tercih edin:

  • Karmaşık mesaj routing ihtiyacınız varsa (topic exchange, fanout)
  • Mesaj kalıcılığı ve kayıp toleransı sıfır olmalıysa
  • Farklı diller arasında mesajlaşmanız gerekiyorsa
  • Büyük ölçekli, yüksek hacimli sistemlerde

Redis’i tercih edin:

  • Zaten altyapınızda Redis varsa
  • Hızlı kurulum ve düşük operasyonel yük istiyorsanız
  • Hem broker hem backend için tek servis yeterliyse
  • Ekibiniz Redis’e hakimse ve RabbitMQ öğrenme maliyeti istemiyorsanız

Benim genel kuralım şu: Startup veya orta ölçekli projelerde Redis ile başlayın, kurulum hızlı ve yönetimi kolay. Kritik finansal işlemler veya enterprise projeler için RabbitMQ’ya yatırım yapın, mesaj garantisi ve routing esnekliği değer.

Hata Ayıklama İpuçları

Yaygın karşılaşılan durumlar ve çözümleri:

  • Worker’lar görevi almıyor: celery -A celery_app inspect active ile bağlı worker’ları kontrol edin. Broker URL’inin doğru olduğundan ve firewall’ın porta izin verdiğinden emin olun.
  • Görevler kuyrukta birikip işlenmiyor: celery -A celery_app inspect reserved ile worker’ın görev alıp almadığına bakın. Concurrency ayarını ve worker sayısını gözden geçirin.
  • Result backend’e yazılamıyor: Redis şifresini ve erişim izinlerini kontrol edin. result_expires değerinin çok düşük olmadığından emin olun.
  • Memory leak şüphesi: worker_max_tasks_per_child değerini düşürün. Worker’ların bellek kullanımını Flower üzerinden izleyin.
# Celery durumunu komut satırından inceleme
celery -A celery_app inspect active       # Aktif görevler
celery -A celery_app inspect scheduled    # Zamanlanmış görevler
celery -A celery_app inspect reserved     # Queue'da bekleyenler
celery -A celery_app inspect stats        # Worker istatistikleri
celery -A celery_app control purge        # Tüm queue'yu temizle (dikkatli!)

Sonuç

Celery’yi ilk kurduğumda en çok zaman harcadığım nokta doğru konfigürasyonu bulmaktı. Dokümantasyon geniş ama “benim durumum için ne kullanmalıyım” sorusu her zaman pratik deneyimden geçiyor.

Özetle şu adımları izleyin: broker’ı (Redis ile başlayın) kurun ve yapılandırın, result backend’i ayrı bir Redis DB’ye yönlendirin, görevlerinizi anlamlı queue’lara bölün, worker’ları systemd ile yönetin ve Flower’ı mutlaka açın. Bu temel yapı, küçük bir projeden orta ölçekli bir sisteme kadar sizi götürür.

Zamanlanmış görevler (celery beat), görev zincirleri (chain, chord, group) ve dağıtık kilitleme gibi ileri konuları sonraki yazılarda ele alacağız. Aklınıza takılan soruları yorumlarda paylaşabilirsiniz.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir