Django Uygulamasına Celery Entegrasyonu

Bir Django projesinde “şu işi arka planda yap” dediğin an, Celery kaçınılmaz oluyor. E-posta gönderimi, PDF oluşturma, üçüncü parti API çağrıları, periyodik raporlar… Bunların hepsini request-response döngüsü içinde yapmaya çalışırsan, kullanıcı ya timeout alır ya da ekranın önünde dakikalarca bekler. Ben bu acıyı yaşadım, production’da kullanıcı şikayetleri gelmeye başlayınca Celery entegrasyonunu aceleyle yapmak zorunda kaldım. O yüzden bu yazıyı “önceden doğru yapsan keşke” bakış açısıyla yazıyorum.

Celery Nedir, Django ile Neden Birlikte Kullanılır?

Celery, Python ekosisteminin en olgun dağıtık görev kuyruğu sistemidir. Temel mantığı şu: Django uygulamanı bir görevi kuyruğa atar (produce), Celery worker’ları bu görevi alır ve arka planda çalıştırır. Bu arada Django kullanıcıya hemen cevap dönmüş olur.

Broker olarak genellikle Redis ya da RabbitMQ kullanılır. Ben Redis’i tercih ediyorum çünkü zaten çoğu projede cache için Redis var, ekstra bir servis ayağa kaldırmak yerine aynı instance’ı kullanabiliyorsunuz. RabbitMQ daha güçlü bir mesaj broker’ı olmakla birlikte, basit ila orta ölçekli projelerde Redis’in sadeliği kazanıyor.

Mimari şu şekilde işliyor:

  • Django uygulaması görevi Redis kuyruğuna yazar
  • Celery worker süreci kuyruğu dinler, görevi çeker
  • Görev çalıştırılır, sonuç isteğe bağlı olarak result backend’e yazılır
  • Celery Beat (scheduler) periyodik görevler için ayrıca devreye girer

Kurulum ve Temel Yapılandırma

Önce gerekli paketleri kuralım:

pip install celery redis django-celery-beat django-celery-results
pip freeze > requirements.txt

django-celery-beat periyodik görevleri veritabanından yönetmeni sağlar. django-celery-results ise görev sonuçlarını Django ORM üzerinden saklar, admin panelinden takip edebilirsin.

Proje yapısı şu şekilde olsun:

myproject/
    myproject/
        __init__.py
        celery.py
        settings.py
    myapp/
        tasks.py
    manage.py

myproject/celery.py dosyasını oluşturuyoruz:

import os
from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')

# Django settings'ten CELERY_ prefix'li ayarları oku
app.config_from_object('django.conf:settings', namespace='CELERY')

# Tüm Django app'lerindeki tasks.py dosyalarını otomatik keşfet
app.autodiscover_tasks()


@app.task(bind=True, ignore_result=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

myproject/__init__.py dosyasını düzenliyoruz, Django başlarken Celery app’i de yüklensin:

from .celery import app as celery_app

__all__ = ('celery_app',)

settings.py içine Celery yapılandırmasını ekliyoruz:

# Celery Ayarları
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'django-db'  # django-celery-results için
CELERY_CACHE_BACKEND = 'django-cache'

CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/Istanbul'

# Görev sonuçları 1 gün sonra silinsin
CELERY_RESULT_EXPIRES = 86400

# Tek bir worker'ın aynı anda alacağı görev sayısı
CELERY_WORKER_PREFETCH_MULTIPLIER = 1

# Uzun süren görevler için task_acks_late önemli
CELERY_TASK_ACKS_LATE = True

INSTALLED_APPS = [
    ...
    'django_celery_beat',
    'django_celery_results',
]

Veritabanı tablolarını oluşturalım:

python manage.py migrate

İlk Görevleri Yazmak

myapp/tasks.py içinde gerçek dünya senaryolarına bakalım. Kullanıcı kayıt olduğunda hoşgeldin e-postası göndermek klasik bir örnek:

from celery import shared_task
from celery.utils.log import get_task_logger
from django.core.mail import send_mail
from django.contrib.auth import get_user_model
import time

logger = get_task_logger(__name__)

User = get_user_model()


@shared_task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,  # 60 saniye bekle, sonra tekrar dene
    name='myapp.tasks.send_welcome_email'
)
def send_welcome_email(self, user_id):
    """
    Kullanıcıya hoşgeldin e-postası gönderir.
    bind=True: self parametresi üzerinden task instance'ına erişim sağlar.
    """
    try:
        user = User.objects.get(pk=user_id)
        send_mail(
            subject='Hoş Geldiniz!',
            message=f'Merhaba {user.first_name}, platforma hoş geldiniz.',
            from_email='[email protected]',
            recipient_list=[user.email],
            fail_silently=False,
        )
        logger.info(f'Hoşgeldin e-postası gönderildi: {user.email}')
        return {'status': 'success', 'user_id': user_id}

    except User.DoesNotExist:
        # Kullanıcı bulunamadı, retry etmenin anlamı yok
        logger.error(f'Kullanıcı bulunamadı: {user_id}')
        return {'status': 'error', 'message': 'Kullanıcı bulunamadı'}

    except Exception as exc:
        logger.warning(f'E-posta gönderilemedi, tekrar denenecek: {exc}')
        # Exponential backoff: her denemede bekleme süresi artar
        raise self.retry(exc=exc, countdown=2 ** self.request.retries * 30)


@shared_task(name='myapp.tasks.generate_report')
def generate_report(report_type, start_date, end_date, user_id):
    """
    Ağır raporlama işlemleri için örnek görev.
    Bu tür işlemleri asla view içinde yapma.
    """
    logger.info(f'{report_type} raporu oluşturuluyor: {start_date} - {end_date}')

    # Raporlama mantığı burada...
    # Veritabanı sorguları, dosya oluşturma vb.
    time.sleep(5)  # Simülasyon

    # Raporu kullanıcıya bildir
    send_welcome_email.delay(user_id)  # Başka bir task'ı tetikleyebilirsin

    return {'status': 'completed', 'report_type': report_type}

View’dan görevi tetiklemek çok basit:

# myapp/views.py
from django.contrib.auth.decorators import login_required
from django.http import JsonResponse
from .tasks import send_welcome_email, generate_report


@login_required
def trigger_report(request):
    if request.method == 'POST':
        report_type = request.POST.get('type', 'monthly')

        # .delay() en sık kullanılan yöntem
        task = generate_report.delay(
            report_type=report_type,
            start_date='2024-01-01',
            end_date='2024-01-31',
            user_id=request.user.id
        )

        # task.id ile frontend'den durum sorgulanabilir
        return JsonResponse({
            'status': 'queued',
            'task_id': task.id
        })


@login_required
def task_status(request, task_id):
    from celery.result import AsyncResult
    result = AsyncResult(task_id)

    response = {
        'task_id': task_id,
        'status': result.status,  # PENDING, STARTED, SUCCESS, FAILURE, RETRY
    }

    if result.status == 'SUCCESS':
        response['result'] = result.result
    elif result.status == 'FAILURE':
        response['error'] = str(result.result)

    return JsonResponse(response)

Görev Durumunu Takip Etmek

Uzun süren işlemlerde kullanıcıya ilerleme durumu göstermek isteyebilirsin. Celery’nin update_state metodu tam bu iş için:

@shared_task(bind=True, name='myapp.tasks.bulk_import')
def bulk_import(self, file_path, user_id):
    """
    CSV dosyasından toplu veri aktarımı.
    İlerleme durumu frontend'e bildiriliyor.
    """
    import csv

    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            rows = list(reader)

        total = len(rows)
        processed = 0
        errors = []

        for i, row in enumerate(rows):
            try:
                # Veri işleme mantığı
                # MyModel.objects.create(**row)
                processed += 1
            except Exception as e:
                errors.append({'row': i + 1, 'error': str(e)})

            # Her 10 kayıtta bir durum güncelle
            if i % 10 == 0:
                self.update_state(
                    state='PROGRESS',
                    meta={
                        'current': i + 1,
                        'total': total,
                        'percent': int((i + 1) / total * 100),
                        'errors': len(errors)
                    }
                )

        return {
            'status': 'completed',
            'processed': processed,
            'errors': errors,
            'total': total
        }

    except FileNotFoundError:
        raise ValueError(f'Dosya bulunamadı: {file_path}')

Periyodik Görevler (Celery Beat)

Her gece yedek almak, her sabah özet rapor göndermek, saatlik cache temizliği… Bunlar için Celery Beat kullanıyoruz. django-celery-beat ile bu tanımları admin panelinden dinamik olarak yönetebilirsin, kod değiştirmene gerek kalmaz.

Kod tarafında settings.py içinde static schedule tanımlamak için:

from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    'gunluk-ozet-raporu': {
        'task': 'myapp.tasks.send_daily_summary',
        'schedule': crontab(hour=8, minute=0),  # Her gün 08:00
        'options': {'queue': 'low_priority'}
    },
    'saatlik-cache-temizligi': {
        'task': 'myapp.tasks.clear_expired_cache',
        'schedule': crontab(minute=0),  # Her saatin başında
    },
    'her-5-dakikada-kontrol': {
        'task': 'myapp.tasks.health_check',
        'schedule': 300.0,  # 300 saniye = 5 dakika
    },
}

Kuyruk Yönetimi ve Önceliklendirme

Production’da tüm görevleri aynı kuyruğa atmak ciddi sorunlara yol açar. Kritik bir ödeme bildirimi, saatlik cache temizliği ile aynı kuyruğa girerse, kuyruğun yoğun olduğu anlarda ödeme bildirimi dakikalarca bekleyebilir.

Birden fazla kuyruk tanımlamak ve worker’ları bu kuyruklara atamak bu sorunu çözer:

# settings.py
from kombu import Queue, Exchange

CELERY_TASK_QUEUES = (
    Queue('critical', Exchange('critical'), routing_key='critical'),
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('low_priority', Exchange('low_priority'), routing_key='low_priority'),
)

CELERY_TASK_DEFAULT_QUEUE = 'default'
CELERY_TASK_DEFAULT_EXCHANGE = 'default'
CELERY_TASK_DEFAULT_ROUTING_KEY = 'default'

CELERY_TASK_ROUTES = {
    'myapp.tasks.send_welcome_email': {'queue': 'critical'},
    'myapp.tasks.generate_report': {'queue': 'low_priority'},
    'myapp.tasks.bulk_import': {'queue': 'low_priority'},
}

Worker’ları farklı kuyrukları dinleyecek şekilde başlatmak:

# Kritik görevler için ayrı worker (daha fazla concurrency)
celery -A myproject worker -Q critical --concurrency=8 --loglevel=info -n worker_critical@%h

# Varsayılan kuyruk için worker
celery -A myproject worker -Q default --concurrency=4 --loglevel=info -n worker_default@%h

# Düşük öncelikli görevler için worker (az concurrency, kaynak hog etmesin)
celery -A myproject worker -Q low_priority --concurrency=2 --loglevel=info -n worker_low@%h

# Beat scheduler'ı başlatmak
celery -A myproject beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler

Systemd ile Production’a Almak

Geliştirme ortamında terminal’den çalıştırman yeterli, ama production’da Celery worker’larının sistem servisi olarak çalışması gerekiyor. İşte temel bir systemd unit dosyası:

# /etc/systemd/system/celery-worker.service

[Unit]
Description=Celery Worker - myproject
After=network.target redis.service

[Service]
Type=forking
User=www-data
Group=www-data
WorkingDirectory=/var/www/myproject
Environment="DJANGO_SETTINGS_MODULE=myproject.settings.production"
ExecStart=/var/www/myproject/venv/bin/celery -A myproject multi start worker_default 
    -Q default,critical 
    --concurrency=4 
    --logfile=/var/log/celery/%n%I.log 
    --pidfile=/var/run/celery/%n.pid 
    --loglevel=info
ExecStop=/var/www/myproject/venv/bin/celery multi stopwait worker_default 
    --pidfile=/var/run/celery/%n.pid
ExecReload=/var/www/myproject/venv/bin/celery multi restart worker_default 
    --pidfile=/var/run/celery/%n.pid 
    --logfile=/var/log/celery/%n%I.log
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target
# Log ve pid dizinleri oluştur
sudo mkdir -p /var/log/celery /var/run/celery
sudo chown www-data:www-data /var/log/celery /var/run/celery

# Servisi etkinleştir
sudo systemctl daemon-reload
sudo systemctl enable celery-worker
sudo systemctl start celery-worker
sudo systemctl status celery-worker

Sık Yapılan Hatalar ve Çözümleri

Production deneyiminden öğrendiğim birkaç kritik noktayı paylaşayım:

Task parametrelerine dikkat et. Görevlere Django model instance’ı geçme. user=request.user gibi bir parametre, serileştirme sırasında sorun çıkarır. Her zaman ID geç: user_id=request.user.id. Worker tarafında ID ile nesneyi tekrar çekebilirsin.

Database bağlantı havuzu. Her Celery worker’ı ayrı bir işlem, her işlem için ayrı DB bağlantısı gerekiyor. CONN_MAX_AGE ayarını gözden geçir. Yüksek concurrency’de bağlantı havuzu dolabilir.

Görevleri idempotent yaz. Bir görev başarısız olup retry edildiğinde, aynı işlemi iki kez yapmış olabilirsin. Örneğin e-posta gönderimi; retry öncesinde e-postanın zaten gönderilip gönderilmediğini kontrol et.

Soft ve hard time limit kullan. Sonsuz döngüye giren veya beklenmedik şekilde uzayan görevler worker’ı bloklar:

@shared_task(
    soft_time_limit=300,  # 5 dakikada uyarı exception fırlatır
    time_limit=360        # 6 dakikada işlemi zorla keser
)
def long_running_task():
    from celery.exceptions import SoftTimeLimitExceeded
    try:
        # Uzun işlem...
        pass
    except SoftTimeLimitExceeded:
        # Temizlik yap, kaynakları serbest bırak
        logger.error("Görev zaman aşımına uğradı, temizlik yapılıyor")
        raise

Flower ile izleme. Celery’nin web tabanlı monitoring aracı Flower’ı production’a almak ciddi bir kolaylık sağlar:

pip install flower
celery -A myproject flower --port=5555 --basic_auth=admin:guclu_sifre

Hangi worker’ın ne yaptığını, başarısız görevleri, kuyruk derinliklerini gerçek zamanlı görebilirsin. Bir şeyler ters gidince terminale bakıp log dosyalarını karıştırmak yerine Flower’ı açıyorum ilk iş.

Sonuç

Celery entegrasyonu ilk bakışta karmaşık görünebilir; broker, worker, beat, result backend… Ama bir kez doğru kurulunca Django uygulamanın esnekliği inanılmaz artıyor. Kullanıcı deneyimini doğrudan etkileyen işlemleri senkrondan asenkrona taşımak, özellikle yük altında performansı ciddi şekilde iyileştiriyor.

Önerdiğim yaklaşım: önce Redis ve tek bir worker ile başla, her şeyi default kuyruğa at. Sistem büyüdükçe önceliklendirme ekle, worker sayını artır. Aşırı mühendislik yapmadan ihtiyaca göre büyümek Celery’nin güzel tarafı.

Son bir şey: görev kodunu asla view’ın içine gömme, her zaman tasks.py içinde tut. İlerleyen süreçte test yazmak, refactor etmek ve hata ayıklamak çok daha kolay olacak.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir