Celery ile FastAPI Entegrasyonu: Asenkron Görev Yönetimi
Prodüksiyonda bir FastAPI servisi çalıştırdınız, her şey güzel gidiyor, kullanıcılar mutlu. Sonra bir gün “e-posta gönder”, “PDF üret”, “üçüncü parti API’ye istek at” gibi gereksinimler gelmeye başlıyor. Bu işlemleri doğrudan endpoint içinde yaparsanız response süreleriniz anında 2-3 saniyeye fırlıyor. İşte tam bu noktada Celery devreye giriyor.
Bu yazıda gerçek bir projede nasıl kurulum yaptığımı, hangi hatalarla karşılaştığımı ve production’da nelere dikkat etmeniz gerektiğini anlatacağım. Teorik değil, elle tutulur bir rehber olmasını istedim.
Mimariyi Anlamak
Önce kafanızda net bir şema oluşturalım. FastAPI bir HTTP isteği aldığında normalde o isteği işleyip yanıt döner. Eğer işlem uzun sürüyorsa kullanıcı beklemek zorunda kalır. Celery bu senaryoda şöyle çalışır:
- FastAPI görevi bir kuyruğa (broker) yazar
- Celery worker o kuyruğu dinler ve görevi alır
- Worker görevi arka planda işler
- Sonucu bir yerde (backend) saklar
- FastAPI istediğinde o sonucu sorgular
Broker olarak Redis veya RabbitMQ kullanılır. Ben bu yazıda Redis’i tercih ettim çünkü zaten çoğu projede cache için Redis var ve ikinci bir servis kurmak istemiyorsunuz.
Kurulum ve Temel Yapı
Önce bağımlılıkları kuralım:
pip install fastapi celery redis uvicorn flower
pip install "celery[redis]"
Proje yapısı şöyle olacak:
myproject/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── celery_app.py
│ ├── tasks/
│ │ ├── __init__.py
│ │ ├── email_tasks.py
│ │ └── report_tasks.py
│ └── routers/
│ ├── __init__.py
│ └── notifications.py
├── celeryconfig.py
└── requirements.txt
Bu yapıyı biraz aşırı mühendislik gibi görebilirsiniz, ama prodüksiyonda task’larınız büyüdükçe hepsini tek dosyaya doldurmak gerçekten sıkıntı çıkarıyor.
Celery Uygulamasını Oluşturmak
# app/celery_app.py
from celery import Celery
from kombu import Exchange, Queue
celery_app = Celery(
"myproject",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
include=[
"app.tasks.email_tasks",
"app.tasks.report_tasks",
]
)
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="Europe/Istanbul",
enable_utc=True,
task_track_started=True,
task_acks_late=True,
worker_prefetch_multiplier=1,
result_expires=3600,
)
# Queue tanımları - önemli!
celery_app.conf.task_queues = (
Queue("default", Exchange("default"), routing_key="default"),
Queue("high_priority", Exchange("high_priority"), routing_key="high_priority"),
Queue("reports", Exchange("reports"), routing_key="reports"),
)
celery_app.conf.task_default_queue = "default"
celery_app.conf.task_default_exchange = "default"
celery_app.conf.task_default_routing_key = "default"
Birkaç önemli ayarı açıklayayım:
- task_acks_late: Worker görevi aldıktan sonra değil, başarıyla tamamladıktan sonra kuyruğa “aldım” sinyali gönderir. Worker crash olursa görev kaybolmaz
- worker_prefetch_multiplier=1: Her worker aynı anda sadece bir görev alır. Uzun süren görevleriniz varsa bu çok önemli, yoksa bir worker tüm ağır görevleri kendi üzerine çekip diğerleri boş kalır
- task_track_started: Görevi “STARTED” durumuna alır, production’da task izleme için şart
Task Yazımı
# app/tasks/email_tasks.py
from app.celery_app import celery_app
from celery.utils.log import get_task_logger
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
logger = get_task_logger(__name__)
@celery_app.task(
bind=True,
max_retries=3,
default_retry_delay=60,
name="tasks.send_welcome_email"
)
def send_welcome_email(self, user_id: int, email: str, username: str):
logger.info(f"Kullanıcıya hoş geldiniz e-postası gönderiliyor: {email}")
try:
msg = MIMEMultipart()
msg["From"] = "[email protected]"
msg["To"] = email
msg["Subject"] = f"Hoş geldiniz, {username}!"
body = f"Merhaba {username}, hesabınız başarıyla oluşturuldu."
msg.attach(MIMEText(body, "plain", "utf-8"))
with smtplib.SMTP("smtp.sirketim.com", 587) as server:
server.starttls()
server.login("[email protected]", "supersecretpassword")
server.sendmail("[email protected]", email, msg.as_string())
logger.info(f"E-posta başarıyla gönderildi: {email}")
return {"status": "success", "email": email}
except smtplib.SMTPException as exc:
logger.error(f"SMTP hatası: {exc}")
raise self.retry(exc=exc, countdown=2 ** self.request.retries * 60)
except Exception as exc:
logger.error(f"Beklenmedik hata: {exc}")
raise self.retry(exc=exc)
bind=True parametresi self ile task nesnesine erişmenizi sağlar. Retry mekanizması için bu şart. Exponential backoff yapıyorum burada: ilk retry 60 saniye, ikinci 120, üçüncü 240 saniye bekliyor. SMTP sunucunuz yük altındayken bunu gerçekten takdir edeceksiniz.
Rapor Görevi – Gerçek Dünya Senaryosu
# app/tasks/report_tasks.py
from app.celery_app import celery_app
from celery.utils.log import get_task_logger
import time
logger = get_task_logger(__name__)
@celery_app.task(
bind=True,
name="tasks.generate_monthly_report",
queue="reports",
soft_time_limit=300,
time_limit=360
)
def generate_monthly_report(self, company_id: int, month: int, year: int):
"""
Aylık rapor üretimi. Uzun sürebilir, ayrı queue'da çalışır.
soft_time_limit: Bu sürede bitmezse SoftTimeLimitExceeded fırlatır
time_limit: Bu sürede bitmezse process kesinlikle öldürülür
"""
try:
# Progress tracking
self.update_state(
state="PROGRESS",
meta={"current": 0, "total": 100, "status": "Veriler toplanıyor..."}
)
# Simüle edilmiş uzun işlem
# Gerçekte DB'den veri çekme, hesaplama vs.
for i in range(1, 101):
time.sleep(0.1) # Gerçek işlem burada
if i % 10 == 0:
self.update_state(
state="PROGRESS",
meta={
"current": i,
"total": 100,
"status": f"İşleniyor... %{i}"
}
)
result = {
"company_id": company_id,
"month": month,
"year": year,
"report_url": f"/reports/{company_id}/{year}/{month}/report.pdf",
"generated_at": "2024-01-15T10:30:00"
}
return result
except Exception as exc:
logger.error(f"Rapor üretim hatası: {exc}")
self.update_state(
state="FAILURE",
meta={"error": str(exc)}
)
raise
FastAPI Entegrasyonu
# app/main.py
from fastapi import FastAPI
from app.routers import notifications
from app.celery_app import celery_app
app = FastAPI(title="MyProject API")
app.include_router(notifications.router, prefix="/api/v1")
@app.get("/health")
async def health_check():
# Celery broker bağlantısını da kontrol et
try:
celery_app.control.ping(timeout=1)
celery_status = "healthy"
except Exception:
celery_status = "unhealthy"
return {
"api": "healthy",
"celery": celery_status
}
# app/routers/notifications.py
from fastapi import APIRouter, HTTPException, BackgroundTasks
from pydantic import BaseModel, EmailStr
from celery.result import AsyncResult
from app.tasks.email_tasks import send_welcome_email
from app.tasks.report_tasks import generate_monthly_report
router = APIRouter()
class UserRegistrationRequest(BaseModel):
user_id: int
email: EmailStr
username: str
class ReportRequest(BaseModel):
company_id: int
month: int
year: int
@router.post("/send-welcome-email")
async def trigger_welcome_email(request: UserRegistrationRequest):
task = send_welcome_email.delay(
user_id=request.user_id,
email=request.email,
username=request.username
)
return {
"message": "E-posta kuyruğa eklendi",
"task_id": task.id,
"status_url": f"/api/v1/tasks/{task.id}"
}
@router.post("/generate-report")
async def trigger_report_generation(request: ReportRequest):
# apply_async ile daha fazla kontrol
task = generate_monthly_report.apply_async(
args=[request.company_id, request.month, request.year],
queue="reports",
countdown=0,
expires=3600 # 1 saat içinde başlamazsa iptal et
)
return {
"message": "Rapor üretimi başlatıldı",
"task_id": task.id
}
@router.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
task_result = AsyncResult(task_id)
if task_result.state == "PENDING":
response = {
"task_id": task_id,
"state": task_result.state,
"status": "Görev kuyruğa alındı, işlenmeyi bekliyor"
}
elif task_result.state == "PROGRESS":
response = {
"task_id": task_id,
"state": task_result.state,
"progress": task_result.info
}
elif task_result.state == "SUCCESS":
response = {
"task_id": task_id,
"state": task_result.state,
"result": task_result.result
}
elif task_result.state == "FAILURE":
response = {
"task_id": task_id,
"state": task_result.state,
"error": str(task_result.info)
}
else:
response = {
"task_id": task_id,
"state": task_result.state
}
return response
Worker’ı Başlatmak
Geliştirme ortamında:
# Default queue için worker
celery -A app.celery_app worker --loglevel=info -Q default
# Reports queue için ayrı worker (ağır işler için)
celery -A app.celery_app worker --loglevel=info -Q reports --concurrency=2 -n reports_worker@%h
# Her iki queue'yu da dinleyen worker
celery -A app.celery_app worker --loglevel=info -Q default,high_priority --concurrency=4
Flower ile monitoring:
celery -A app.celery_app flower --port=5555 --broker=redis://localhost:6379/0
Production için systemd service dosyası:
# /etc/systemd/system/celery-worker.service
[Unit]
Description=Celery Worker Service
After=network.target redis.service
[Service]
Type=forking
User=appuser
Group=appuser
WorkingDirectory=/opt/myproject
Environment="PATH=/opt/myproject/venv/bin"
ExecStart=/opt/myproject/venv/bin/celery -A app.celery_app multi start
default_worker
reports_worker
--pidfile=/var/run/celery/%n.pid
--logfile=/var/log/celery/%n%I.log
--loglevel=info
-Q:default_worker default,high_priority
-Q:reports_worker reports
--concurrency:default_worker=4
--concurrency:reports_worker=2
ExecStop=/opt/myproject/venv/bin/celery multi stopwait
default_worker reports_worker
--pidfile=/var/run/celery/%n.pid
ExecReload=/opt/myproject/venv/bin/celery multi restart
default_worker reports_worker
--pidfile=/var/run/celery/%n.pid
--logfile=/var/log/celery/%n%I.log
RuntimeDirectory=celery
RuntimeDirectoryMode=0755
StandardError=syslog
SyslogIdentifier=celery
[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl enable celery-worker
sudo systemctl start celery-worker
sudo systemctl status celery-worker
Periyodik Görevler – Celery Beat
Zamanlanmış görevler için Celery Beat kullanıyoruz:
# celeryconfig.py
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
"generate-monthly-reports": {
"task": "tasks.generate_monthly_report",
"schedule": crontab(hour=2, minute=0, day_of_month=1),
"args": (1, None, None), # company_id, month, year - None olanlar hesaplanır
"options": {"queue": "reports"}
},
"cleanup-old-results": {
"task": "tasks.cleanup_expired_results",
"schedule": crontab(hour=3, minute=30),
"options": {"queue": "default"}
}
}
# Beat servisini başlatmak
celery -A app.celery_app beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler
Sık Karşılaşılan Sorunlar ve Çözümleri
Circular import problemi: FastAPI ve Celery aynı modülleri import ettiğinde döngüsel bağımlılık oluşuyor. Bunu çözmek için Celery uygulamasını ayrı bir modülde tutun ve task’larda FastAPI bileşenlerini import etmeyin.
Task’ların serialize edilememesi: Pydantic model nesnelerini direkt task’a geçmeyin. Task parametrelerinizi her zaman primitive tipler olarak tanımlayın (int, str, dict). Task içinde gerekirse nesneyi yeniden oluşturun.
Worker bellek sızıntısı: Uzun süre çalışan worker’larda bellek artıyor mu? --max-tasks-per-child parametresiyle worker’ları belirli sayıda task sonrası yeniden başlatın:
celery -A app.celery_app worker --max-tasks-per-child=100 --loglevel=info
Redis bağlantı kopukluğu: Production’da Redis bağlantısı zaman zaman kopabilir. Broker transport seçeneklerine retry ayarları ekleyin:
celery_app.conf.broker_transport_options = {
"visibility_timeout": 3600,
"fanout_prefix": True,
"fanout_patterns": True,
"socket_connect_timeout": 5,
"socket_timeout": 5,
"retry_on_timeout": True,
}
Sonuç
FastAPI ile Celery entegrasyonu ilk bakışta karmaşık görünüyor, ama doğru yapılandırıldığında inanılmaz güçlü bir kombinasyon ortaya çıkıyor. Birkaç kritik noktayı tekrar vurgulayayım:
- Ayrı queue’lar kullanın. Ağır raporlama işlemleri ile hızlı bildirim işlemlerini aynı kuyruğa koymayın. Yoksa bir rapor görevi kuyruğu tıkar ve e-postalarınız saatlerce bekler.
- Retry mekanizmasını iyi tasarlayın. Exponential backoff’u hiçbir zaman atlamamalısınız, özellikle üçüncü parti servislere bağımlısanız.
- Task’larınızı idempotent yazın. Aynı task iki kez çalışsa da sonuç değişmemeli.
task_acks_lateile birleşince worker crash senaryosunda hayat kurtarır. - Flower’ı production’a açmayın. Monitoring için kullanın ama authentication olmadan internete açık bırakmak ciddi güvenlik riski.
- Log’larınızı takip edin.
get_task_loggerkullanın ve task ID’leri her zaman log’a yazın. Bir şeyler ters gittiğinde task ID üzerinden tüm süreci izleyebilirsiniz.
Bu yapıyı birkaç farklı projede kullandım ve response sürelerini ortalama 2-3 saniyeden 50-100 milisaniyeye düşürdüm. Kullanıcı tarafında fark gece gündüz gibi.
