Celery ile FastAPI Entegrasyonu: Asenkron Görev Yönetimi

Prodüksiyonda bir FastAPI servisi çalıştırdınız, her şey güzel gidiyor, kullanıcılar mutlu. Sonra bir gün “e-posta gönder”, “PDF üret”, “üçüncü parti API’ye istek at” gibi gereksinimler gelmeye başlıyor. Bu işlemleri doğrudan endpoint içinde yaparsanız response süreleriniz anında 2-3 saniyeye fırlıyor. İşte tam bu noktada Celery devreye giriyor.

Bu yazıda gerçek bir projede nasıl kurulum yaptığımı, hangi hatalarla karşılaştığımı ve production’da nelere dikkat etmeniz gerektiğini anlatacağım. Teorik değil, elle tutulur bir rehber olmasını istedim.

Mimariyi Anlamak

Önce kafanızda net bir şema oluşturalım. FastAPI bir HTTP isteği aldığında normalde o isteği işleyip yanıt döner. Eğer işlem uzun sürüyorsa kullanıcı beklemek zorunda kalır. Celery bu senaryoda şöyle çalışır:

  • FastAPI görevi bir kuyruğa (broker) yazar
  • Celery worker o kuyruğu dinler ve görevi alır
  • Worker görevi arka planda işler
  • Sonucu bir yerde (backend) saklar
  • FastAPI istediğinde o sonucu sorgular

Broker olarak Redis veya RabbitMQ kullanılır. Ben bu yazıda Redis’i tercih ettim çünkü zaten çoğu projede cache için Redis var ve ikinci bir servis kurmak istemiyorsunuz.

Kurulum ve Temel Yapı

Önce bağımlılıkları kuralım:

pip install fastapi celery redis uvicorn flower
pip install "celery[redis]"

Proje yapısı şöyle olacak:

myproject/
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── celery_app.py
│   ├── tasks/
│   │   ├── __init__.py
│   │   ├── email_tasks.py
│   │   └── report_tasks.py
│   └── routers/
│       ├── __init__.py
│       └── notifications.py
├── celeryconfig.py
└── requirements.txt

Bu yapıyı biraz aşırı mühendislik gibi görebilirsiniz, ama prodüksiyonda task’larınız büyüdükçe hepsini tek dosyaya doldurmak gerçekten sıkıntı çıkarıyor.

Celery Uygulamasını Oluşturmak

# app/celery_app.py
from celery import Celery
from kombu import Exchange, Queue

celery_app = Celery(
    "myproject",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
    include=[
        "app.tasks.email_tasks",
        "app.tasks.report_tasks",
    ]
)

celery_app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="Europe/Istanbul",
    enable_utc=True,
    task_track_started=True,
    task_acks_late=True,
    worker_prefetch_multiplier=1,
    result_expires=3600,
)

# Queue tanımları - önemli!
celery_app.conf.task_queues = (
    Queue("default", Exchange("default"), routing_key="default"),
    Queue("high_priority", Exchange("high_priority"), routing_key="high_priority"),
    Queue("reports", Exchange("reports"), routing_key="reports"),
)

celery_app.conf.task_default_queue = "default"
celery_app.conf.task_default_exchange = "default"
celery_app.conf.task_default_routing_key = "default"

Birkaç önemli ayarı açıklayayım:

  • task_acks_late: Worker görevi aldıktan sonra değil, başarıyla tamamladıktan sonra kuyruğa “aldım” sinyali gönderir. Worker crash olursa görev kaybolmaz
  • worker_prefetch_multiplier=1: Her worker aynı anda sadece bir görev alır. Uzun süren görevleriniz varsa bu çok önemli, yoksa bir worker tüm ağır görevleri kendi üzerine çekip diğerleri boş kalır
  • task_track_started: Görevi “STARTED” durumuna alır, production’da task izleme için şart

Task Yazımı

# app/tasks/email_tasks.py
from app.celery_app import celery_app
from celery.utils.log import get_task_logger
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

logger = get_task_logger(__name__)

@celery_app.task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,
    name="tasks.send_welcome_email"
)
def send_welcome_email(self, user_id: int, email: str, username: str):
    logger.info(f"Kullanıcıya hoş geldiniz e-postası gönderiliyor: {email}")
    
    try:
        msg = MIMEMultipart()
        msg["From"] = "[email protected]"
        msg["To"] = email
        msg["Subject"] = f"Hoş geldiniz, {username}!"
        
        body = f"Merhaba {username}, hesabınız başarıyla oluşturuldu."
        msg.attach(MIMEText(body, "plain", "utf-8"))
        
        with smtplib.SMTP("smtp.sirketim.com", 587) as server:
            server.starttls()
            server.login("[email protected]", "supersecretpassword")
            server.sendmail("[email protected]", email, msg.as_string())
        
        logger.info(f"E-posta başarıyla gönderildi: {email}")
        return {"status": "success", "email": email}
        
    except smtplib.SMTPException as exc:
        logger.error(f"SMTP hatası: {exc}")
        raise self.retry(exc=exc, countdown=2 ** self.request.retries * 60)
    except Exception as exc:
        logger.error(f"Beklenmedik hata: {exc}")
        raise self.retry(exc=exc)

bind=True parametresi self ile task nesnesine erişmenizi sağlar. Retry mekanizması için bu şart. Exponential backoff yapıyorum burada: ilk retry 60 saniye, ikinci 120, üçüncü 240 saniye bekliyor. SMTP sunucunuz yük altındayken bunu gerçekten takdir edeceksiniz.

Rapor Görevi – Gerçek Dünya Senaryosu

# app/tasks/report_tasks.py
from app.celery_app import celery_app
from celery.utils.log import get_task_logger
import time

logger = get_task_logger(__name__)

@celery_app.task(
    bind=True,
    name="tasks.generate_monthly_report",
    queue="reports",
    soft_time_limit=300,
    time_limit=360
)
def generate_monthly_report(self, company_id: int, month: int, year: int):
    """
    Aylık rapor üretimi. Uzun sürebilir, ayrı queue'da çalışır.
    soft_time_limit: Bu sürede bitmezse SoftTimeLimitExceeded fırlatır
    time_limit: Bu sürede bitmezse process kesinlikle öldürülür
    """
    try:
        # Progress tracking
        self.update_state(
            state="PROGRESS",
            meta={"current": 0, "total": 100, "status": "Veriler toplanıyor..."}
        )
        
        # Simüle edilmiş uzun işlem
        # Gerçekte DB'den veri çekme, hesaplama vs.
        for i in range(1, 101):
            time.sleep(0.1)  # Gerçek işlem burada
            if i % 10 == 0:
                self.update_state(
                    state="PROGRESS",
                    meta={
                        "current": i,
                        "total": 100,
                        "status": f"İşleniyor... %{i}"
                    }
                )
        
        result = {
            "company_id": company_id,
            "month": month,
            "year": year,
            "report_url": f"/reports/{company_id}/{year}/{month}/report.pdf",
            "generated_at": "2024-01-15T10:30:00"
        }
        
        return result
        
    except Exception as exc:
        logger.error(f"Rapor üretim hatası: {exc}")
        self.update_state(
            state="FAILURE",
            meta={"error": str(exc)}
        )
        raise

FastAPI Entegrasyonu

# app/main.py
from fastapi import FastAPI
from app.routers import notifications
from app.celery_app import celery_app

app = FastAPI(title="MyProject API")

app.include_router(notifications.router, prefix="/api/v1")

@app.get("/health")
async def health_check():
    # Celery broker bağlantısını da kontrol et
    try:
        celery_app.control.ping(timeout=1)
        celery_status = "healthy"
    except Exception:
        celery_status = "unhealthy"
    
    return {
        "api": "healthy",
        "celery": celery_status
    }
# app/routers/notifications.py
from fastapi import APIRouter, HTTPException, BackgroundTasks
from pydantic import BaseModel, EmailStr
from celery.result import AsyncResult
from app.tasks.email_tasks import send_welcome_email
from app.tasks.report_tasks import generate_monthly_report

router = APIRouter()

class UserRegistrationRequest(BaseModel):
    user_id: int
    email: EmailStr
    username: str

class ReportRequest(BaseModel):
    company_id: int
    month: int
    year: int

@router.post("/send-welcome-email")
async def trigger_welcome_email(request: UserRegistrationRequest):
    task = send_welcome_email.delay(
        user_id=request.user_id,
        email=request.email,
        username=request.username
    )
    
    return {
        "message": "E-posta kuyruğa eklendi",
        "task_id": task.id,
        "status_url": f"/api/v1/tasks/{task.id}"
    }

@router.post("/generate-report")
async def trigger_report_generation(request: ReportRequest):
    # apply_async ile daha fazla kontrol
    task = generate_monthly_report.apply_async(
        args=[request.company_id, request.month, request.year],
        queue="reports",
        countdown=0,
        expires=3600  # 1 saat içinde başlamazsa iptal et
    )
    
    return {
        "message": "Rapor üretimi başlatıldı",
        "task_id": task.id
    }

@router.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
    task_result = AsyncResult(task_id)
    
    if task_result.state == "PENDING":
        response = {
            "task_id": task_id,
            "state": task_result.state,
            "status": "Görev kuyruğa alındı, işlenmeyi bekliyor"
        }
    elif task_result.state == "PROGRESS":
        response = {
            "task_id": task_id,
            "state": task_result.state,
            "progress": task_result.info
        }
    elif task_result.state == "SUCCESS":
        response = {
            "task_id": task_id,
            "state": task_result.state,
            "result": task_result.result
        }
    elif task_result.state == "FAILURE":
        response = {
            "task_id": task_id,
            "state": task_result.state,
            "error": str(task_result.info)
        }
    else:
        response = {
            "task_id": task_id,
            "state": task_result.state
        }
    
    return response

Worker’ı Başlatmak

Geliştirme ortamında:

# Default queue için worker
celery -A app.celery_app worker --loglevel=info -Q default

# Reports queue için ayrı worker (ağır işler için)
celery -A app.celery_app worker --loglevel=info -Q reports --concurrency=2 -n reports_worker@%h

# Her iki queue'yu da dinleyen worker
celery -A app.celery_app worker --loglevel=info -Q default,high_priority --concurrency=4

Flower ile monitoring:

celery -A app.celery_app flower --port=5555 --broker=redis://localhost:6379/0

Production için systemd service dosyası:

# /etc/systemd/system/celery-worker.service
[Unit]
Description=Celery Worker Service
After=network.target redis.service

[Service]
Type=forking
User=appuser
Group=appuser
WorkingDirectory=/opt/myproject
Environment="PATH=/opt/myproject/venv/bin"
ExecStart=/opt/myproject/venv/bin/celery -A app.celery_app multi start 
    default_worker 
    reports_worker 
    --pidfile=/var/run/celery/%n.pid 
    --logfile=/var/log/celery/%n%I.log 
    --loglevel=info 
    -Q:default_worker default,high_priority 
    -Q:reports_worker reports 
    --concurrency:default_worker=4 
    --concurrency:reports_worker=2
ExecStop=/opt/myproject/venv/bin/celery multi stopwait 
    default_worker reports_worker 
    --pidfile=/var/run/celery/%n.pid
ExecReload=/opt/myproject/venv/bin/celery multi restart 
    default_worker reports_worker 
    --pidfile=/var/run/celery/%n.pid 
    --logfile=/var/log/celery/%n%I.log
RuntimeDirectory=celery
RuntimeDirectoryMode=0755
StandardError=syslog
SyslogIdentifier=celery

[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl enable celery-worker
sudo systemctl start celery-worker
sudo systemctl status celery-worker

Periyodik Görevler – Celery Beat

Zamanlanmış görevler için Celery Beat kullanıyoruz:

# celeryconfig.py
from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    "generate-monthly-reports": {
        "task": "tasks.generate_monthly_report",
        "schedule": crontab(hour=2, minute=0, day_of_month=1),
        "args": (1, None, None),  # company_id, month, year - None olanlar hesaplanır
        "options": {"queue": "reports"}
    },
    "cleanup-old-results": {
        "task": "tasks.cleanup_expired_results",
        "schedule": crontab(hour=3, minute=30),
        "options": {"queue": "default"}
    }
}
# Beat servisini başlatmak
celery -A app.celery_app beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler

Sık Karşılaşılan Sorunlar ve Çözümleri

Circular import problemi: FastAPI ve Celery aynı modülleri import ettiğinde döngüsel bağımlılık oluşuyor. Bunu çözmek için Celery uygulamasını ayrı bir modülde tutun ve task’larda FastAPI bileşenlerini import etmeyin.

Task’ların serialize edilememesi: Pydantic model nesnelerini direkt task’a geçmeyin. Task parametrelerinizi her zaman primitive tipler olarak tanımlayın (int, str, dict). Task içinde gerekirse nesneyi yeniden oluşturun.

Worker bellek sızıntısı: Uzun süre çalışan worker’larda bellek artıyor mu? --max-tasks-per-child parametresiyle worker’ları belirli sayıda task sonrası yeniden başlatın:

celery -A app.celery_app worker --max-tasks-per-child=100 --loglevel=info

Redis bağlantı kopukluğu: Production’da Redis bağlantısı zaman zaman kopabilir. Broker transport seçeneklerine retry ayarları ekleyin:

celery_app.conf.broker_transport_options = {
    "visibility_timeout": 3600,
    "fanout_prefix": True,
    "fanout_patterns": True,
    "socket_connect_timeout": 5,
    "socket_timeout": 5,
    "retry_on_timeout": True,
}

Sonuç

FastAPI ile Celery entegrasyonu ilk bakışta karmaşık görünüyor, ama doğru yapılandırıldığında inanılmaz güçlü bir kombinasyon ortaya çıkıyor. Birkaç kritik noktayı tekrar vurgulayayım:

  • Ayrı queue’lar kullanın. Ağır raporlama işlemleri ile hızlı bildirim işlemlerini aynı kuyruğa koymayın. Yoksa bir rapor görevi kuyruğu tıkar ve e-postalarınız saatlerce bekler.
  • Retry mekanizmasını iyi tasarlayın. Exponential backoff’u hiçbir zaman atlamamalısınız, özellikle üçüncü parti servislere bağımlısanız.
  • Task’larınızı idempotent yazın. Aynı task iki kez çalışsa da sonuç değişmemeli. task_acks_late ile birleşince worker crash senaryosunda hayat kurtarır.
  • Flower’ı production’a açmayın. Monitoring için kullanın ama authentication olmadan internete açık bırakmak ciddi güvenlik riski.
  • Log’larınızı takip edin. get_task_logger kullanın ve task ID’leri her zaman log’a yazın. Bir şeyler ters gittiğinde task ID üzerinden tüm süreci izleyebilirsiniz.

Bu yapıyı birkaç farklı projede kullandım ve response sürelerini ortalama 2-3 saniyeden 50-100 milisaniyeye düşürdüm. Kullanıcı tarafında fark gece gündüz gibi.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir