Celery Nedir: Python Uygulamalarında Asenkron Görevler
Bir web uygulaması geliştiriyorsunuz, kullanıcı “Şifremi Unuttum” butonuna basıyor ve siz o anda e-posta gönderme işlemini senkron olarak yapıyorsunuz. Kullanıcı ekranda 3-4 saniye beklemeye başlıyor. Sonra birisi “toplu fatura gönder” özelliği istiyor, bu sefer 10 saniye. Sonra “tüm kullanıcılara bildirim gönder” geliyor… İşte tam bu noktada Celery devreye giriyor ve hayatınızı kurtarıyor.
Celery, Python ekosistemindeki en olgun ve en yaygın kullanılan dağıtık görev kuyruğu sistemidir. 2009’dan beri aktif geliştirme altında olan bu kütüphane, Django ve Flask gibi web framework’leriyle neredeyse akıl almaz bir uyum içinde çalışır. Ama Celery’yi sadece “e-posta gönderme işini arkaya at” aracı olarak görmek büyük bir hata. Gerçek kullanım senaryoları çok daha derine iner.
Celery’nin Temel Mimarisi
Celery’yi anlamak için üç temel bileşeni kavramak gerekiyor:
Worker (İşçi): Asıl işi yapan süreçler. Görevleri kuyruktan alır ve çalıştırır. Birden fazla worker aynı anda farklı görevleri işleyebilir.
Broker (Aracı): Worker’lar ile uygulamanız arasındaki mesaj kanalı. Redis ve RabbitMQ en yaygın kullanılan broker’lardır. Görevler önce broker’a yazılır, worker’lar oradan okur.
Result Backend: Görev sonuçlarını saklamak için kullanılır. Her zaman gerekli değildir ama görev durumunu sorgulamak istiyorsanız şart.
Akış şu şekilde işler: Uygulamanız bir görevi kuyruğa iter, broker bunu saklar, boşta olan bir worker bu görevi alır ve çalıştırır, sonucu result backend’e yazar. Bu kadar basit ama bu basitliğin altında inanılmaz bir güç var.
Kurulum ve İlk Yapılandırma
Önce gerekli paketleri kuralım. Ben genellikle Redis’i broker olarak kullanırım çünkü RabbitMQ’ya kıyasla kurulumu çok daha basit ve küçük-orta ölçekli projelerde fazlasıyla yeterli:
pip install celery redis
# veya Django kullanıyorsanız
pip install celery[redis] django-celery-beat django-celery-results
Redis’i sisteme kurmak için:
# Ubuntu/Debian
sudo apt-get install redis-server
sudo systemctl enable redis-server
sudo systemctl start redis-server
# Redis'in çalıştığını doğrulayın
redis-cli ping
# Çıktı: PONG
Şimdi temel bir Celery uygulaması oluşturalım. Diyelim ki myapp adında bir proje var:
# myapp/celery_app.py
from celery import Celery
app = Celery(
'myapp',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1',
include=['myapp.tasks']
)
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Europe/Istanbul',
enable_utc=True,
# Görev sonuçlarını 1 gün sakla
result_expires=86400,
# Worker başına maksimum görev sayısı (bellek sızıntısına karşı)
worker_max_tasks_per_child=1000,
)
İlk Görevinizi Tanımlamak
Celery’de görevler @app.task dekoratörü ile tanımlanır. Basit bir örnekle başlayalım:
# myapp/tasks.py
from myapp.celery_app import app
import time
import logging
logger = logging.getLogger(__name__)
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def send_email_task(self, recipient, subject, body):
"""
E-posta gönderme görevi.
bind=True: self parametresi ile görev nesnesine erişim sağlar
max_retries=3: Hata durumunda 3 kez tekrar dene
default_retry_delay=60: Tekrar denemeler arası 60 saniye bekle
"""
try:
logger.info(f"E-posta gönderiliyor: {recipient}")
# Gerçek e-posta gönderme kodu buraya gelir
# send_mail(subject, body, '[email protected]', [recipient])
time.sleep(2) # Simülasyon için
logger.info(f"E-posta başarıyla gönderildi: {recipient}")
return {'status': 'success', 'recipient': recipient}
except Exception as exc:
logger.error(f"E-posta gönderilemedi: {exc}")
# Görevi yeniden kuyruğa at
raise self.retry(exc=exc, countdown=60)
@app.task
def generate_report(report_type, filters=None):
"""Rapor oluşturma görevi - uzun süren işlemler için ideal"""
logger.info(f"Rapor oluşturuluyor: {report_type}")
# Ağır veritabanı sorguları, Excel oluşturma vs.
time.sleep(10)
return {'report': report_type, 'status': 'completed'}
Görevi çalıştırmak için:
# Görevi kuyruğa atmak (uygulamanızın herhangi bir yerinden)
from myapp.tasks import send_email_task
# Asenkron olarak çalıştır (hemen döner, işi arka planda yapar)
result = send_email_task.delay('[email protected]', 'Hoş Geldiniz', 'Merhaba!')
# Görev ID'sini saklayın, durumu sorgulamak için kullanabilirsiniz
print(f"Görev ID: {result.id}")
# Sonucu beklemek istiyorsanız (bunu production'da dikkatli kullanın)
# result.get(timeout=30)
Django ile Entegrasyon
Django projesinde Celery kurmak biraz daha özel bir yapı gerektirir. Django’nun settings modülünü Celery’nin görmesi için:
# myproject/celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
# myproject/__init__.py dosyasına ekleyin:
# from .celery import app as celery_app
# __all__ = ('celery_app',)
settings.py dosyasına Celery ayarlarını ekleyin:
# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/Istanbul'
# Periyodik görevler için (Celery Beat)
CELERY_BEAT_SCHEDULE = {
'gunluk-rapor': {
'task': 'myapp.tasks.generate_daily_report',
'schedule': crontab(hour=8, minute=0), # Her gün sabah 08:00
},
'cache-temizle': {
'task': 'myapp.tasks.clear_expired_cache',
'schedule': crontab(minute='*/30'), # Her 30 dakikada bir
},
}
Gerçek Dünya Senaryosu: E-ticaret Sipariş İşleme
Benim en çok karşılaştığım senaryo e-ticaret sistemleri. Bir müşteri sipariş verdiğinde onlarca iş tetiklenmesi gerekiyor. Bunların hepsini senkron yapmak kullanıcıyı dakikalarca bekleteceğinden Celery burada kritik rol oynuyor:
# ecommerce/tasks.py
from celery import app, chain, group, chord
from myproject.celery import app
@app.task
def send_order_confirmation(order_id):
"""Sipariş onay e-postası gönder"""
# order = Order.objects.get(id=order_id)
# send_mail(...)
return f"Onay e-postası gönderildi: #{order_id}"
@app.task
def update_inventory(order_id):
"""Stok güncelle"""
# Ürün stoklarını azalt
return f"Stok güncellendi: #{order_id}"
@app.task
def notify_warehouse(order_id):
"""Depoyu bilgilendir"""
# Depo sistemine API çağrısı
return f"Depo bilgilendirildi: #{order_id}"
@app.task
def update_customer_statistics(customer_id):
"""Müşteri istatistiklerini güncelle"""
# Toplam harcama, sipariş sayısı vs.
return f"İstatistikler güncellendi: {customer_id}"
def process_new_order(order_id, customer_id):
"""
Sipariş geldiğinde bu fonksiyonu çağırın.
group: Görevleri paralel çalıştırır
chain: Görevleri sırayla çalıştırır (birinin çıktısı diğerine gider)
"""
# Stok güncelleme ve depo bildirimi paralel çalışsın
parallel_tasks = group(
update_inventory.s(order_id),
notify_warehouse.s(order_id),
update_customer_statistics.s(customer_id),
)
# Önce paralel işler, sonra onay e-postası
workflow = chain(
parallel_tasks,
send_order_confirmation.s(order_id)
)
workflow.delay()
Bu yapıda group ile paralel çalıştırma, chain ile sıralı çalıştırma yapıyorsunuz. Celery’nin bu primitifleri gerçekten güçlü.
Worker’ları Yönetmek
Worker’ları başlatmak için:
# Temel worker başlatma
celery -A myproject worker --loglevel=info
# Birden fazla worker process ile (CPU sayısına göre ayarlayın)
celery -A myproject worker --concurrency=4 --loglevel=info
# Belirli kuyrukları dinleyen worker
celery -A myproject worker -Q emails,reports --loglevel=info
# Periyodik görevler için Beat servisini başlatın
celery -A myproject beat --loglevel=info
# Hem worker hem beat aynı anda (sadece geliştirme için, production'da ayrı çalıştırın)
celery -A myproject worker --beat --loglevel=info
Production ortamında Celery’yi Systemd ile yönetmek en mantıklı yöntem:
# /etc/systemd/system/celery.service
[Unit]
Description=Celery Worker Service
After=network.target redis.service
[Service]
Type=forking
User=www-data
Group=www-data
WorkingDirectory=/var/www/myproject
Environment="PATH=/var/www/myproject/venv/bin"
ExecStart=/var/www/myproject/venv/bin/celery
-A myproject worker
--loglevel=info
--concurrency=4
--logfile=/var/log/celery/worker.log
--pidfile=/var/run/celery/worker.pid
--detach
ExecStop=/bin/kill -TERM $MAINPID
ExecReload=/bin/kill -HUP $MAINPID
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl enable celery
sudo systemctl start celery
sudo systemctl status celery
Kuyruk Önceliklendirme ve Yönlendirme
Her görevi aynı kuyruğa atmak büyük bir hata. Düşünün: Kullanıcının şifre sıfırlama e-postası ile haftalık toplu rapor maili aynı kuyruğa girdi. Toplu rapor maili 10.000 kullanıcıya gidecek, şifre sıfırlama maili arka sıraya takıldı. Kullanıcı 20 dakika bekliyor…
# Farklı görevleri farklı kuyruklara yönlendirme
@app.task(queue='high_priority')
def send_password_reset(email):
pass
@app.task(queue='low_priority')
def send_weekly_newsletter(email_list):
pass
@app.task(queue='reports')
def generate_monthly_report():
pass
# settings.py'de kuyruk yapılandırması
from kombu import Queue
CELERY_TASK_QUEUES = (
Queue('high_priority', routing_key='high.#'),
Queue('default', routing_key='default.#'),
Queue('low_priority', routing_key='low.#'),
Queue('reports', routing_key='reports.#'),
)
CELERY_TASK_DEFAULT_QUEUE = 'default'
# Her kuyruk için ayrı worker başlatın
# celery -A myproject worker -Q high_priority --concurrency=8
# celery -A myproject worker -Q default --concurrency=4
# celery -A myproject worker -Q low_priority,reports --concurrency=2
Monitoring: Flower ile Gözetim
Production’da ne olduğunu bilmeden uçmak istemezsiniz. Flower, Celery için web tabanlı bir izleme aracı:
pip install flower
# Flower'ı başlatın
celery -A myproject flower --port=5555 --basic_auth=admin:gizlisifre
# Systemd ile yönetmek için
# /etc/systemd/system/celery-flower.service dosyası oluşturun
Flower size şunu gösterir:
- Aktif, bekleyen ve başarısız görev sayıları
- Worker’ların durumu ve yük dağılımı
- Görev geçmişi ve hata logları
- Ortalama görev süresi istatistikleri
Bunun yanında Redis CLI ile de hızlı durum kontrolü yapabilirsiniz:
# Kuyruktaki görev sayısını kontrol et
redis-cli llen celery
# Tüm Celery anahtarlarını listele
redis-cli keys "celery*"
# Aktif görev sayısı
celery -A myproject inspect active | grep -c "task"
# Worker durumu
celery -A myproject status
Yaygın Hatalar ve Çözümleri
Görev tamamlanmıyor gibi görünüyor: Büyük ihtimalle worker’da bir exception oluştu ve görev failed durumuna düştü. Her zaman try-except kullanın ve hataları loglayın. result.state ile durumu sorgulayın.
Bellek tükeniyor: Worker süreçleri zamanla bellek tüketebilir. worker_max_tasks_per_child parametresini ayarlayın. Ben genellikle 500-1000 aralığını kullanırım.
Aynı görev birden fazla kez çalışıyor: Bu “at-least-once delivery” garantisinin bir sonucu. Görevlerinizi idempotent yapın, yani aynı görev birden fazla çalışsa bile sonuç değişmesin. Veritabanı işlemleri için benzersiz kısıtlamalar ekleyin.
Broker bağlantısı kopuyor: Production’da Redis/RabbitMQ için connection pooling ve heartbeat ayarlarını yapılandırın:
# Bağlantı kararlılığı için
CELERY_BROKER_TRANSPORT_OPTIONS = {
'visibility_timeout': 3600,
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.5,
}
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
Celery Beat ile Zamanlanmış Görevler
Celery Beat, cron benzeri periyodik görev planlayıcısıdır. Django projelerinde django-celery-beat paketi ile görevleri veritabanından yönetebilirsiniz, yani yeniden deploy olmadan görev zamanlarını değiştirebilirsiniz:
pip install django-celery-beat
# settings.py INSTALLED_APPS'e ekleyin:
# 'django_celery_beat',
python manage.py migrate
# Programatik olarak zamanlanmış görev oluşturma
from django_celery_beat.models import PeriodicTask, CrontabSchedule
import json
schedule, created = CrontabSchedule.objects.get_or_create(
minute='0',
hour='9',
day_of_week='1-5', # Pazartesi-Cuma
day_of_month='*',
month_of_year='*',
timezone='Europe/Istanbul'
)
PeriodicTask.objects.get_or_create(
crontab=schedule,
name='Sabah raporu',
task='myapp.tasks.send_morning_report',
args=json.dumps([]),
kwargs=json.dumps({'department': 'all'}),
)
Task Retry Stratejileri
Üretim ortamlarında ağ hataları, geçici servis kesintileri kaçınılmaz. Exponential backoff ile retry yapmak en iyi pratik:
@app.task(
bind=True,
autoretry_for=(ConnectionError, TimeoutError),
max_retries=5,
retry_backoff=True, # Exponential backoff aktif
retry_backoff_max=600, # Maksimum 10 dakika bekle
retry_jitter=True # Thundering herd problemini önle
)
def call_external_api(self, endpoint, data):
"""
Dış API çağrısı - hata durumunda otomatik retry
retry_jitter: Her retry'a rastgele gecikme ekler
"""
import requests
response = requests.post(endpoint, json=data, timeout=30)
response.raise_for_status()
return response.json()
retry_jitter parametresi küçük görünüyor ama çok önemli. Onlarca görev aynı anda hata aldığında ve hepsi aynı anda retry yapmaya kalktığında “thundering herd” problemi yaşanıyor. Rastgele gecikme bunu önlüyor.
Sonuç
Celery, doğru kullanıldığında Python web uygulamalarının performansını ve ölçeklenebilirliğini dramatik şekilde artıran bir araç. Yanlış kullanıldığında ise debugging kabusu olabiliyor.
Öğrendiğim en önemli dersler şunlar:
- Küçük başlayın: İlk Celery entegrasyonunuzu tek bir görev türüyle yapın, sonra genişletin.
- Görevleri idempotent tasarlayın: Aynı görev birden fazla çalışsa da sistem bozulmamalı.
- Kuyrukları ayırın: Kritik ve kritik olmayan görevleri asla aynı kuyruğa atmayın.
- Monitoring zorunlu: Flower veya benzeri bir araç olmadan production’a çıkmayın.
- Retry stratejisi şart: Görevlerinizin nasıl başarısız olduğunu ve ne yapılacağını önceden planlayın.
- Loglama her şeydir: Her görevin başında ve sonunda log yazın, hata ayıklamak çok kolaylaşıyor.
Celery öğrenmek için harcadığınız zaman, ilerleyen dönemde size kat kat geri dönüyor. “Neden bu sayfa bu kadar yavaş?” sorusunu sormayı bırakıp “Bu görevi ne zaman kuyruğa atayım?” sorusunu sorduğunuz günden itibaren web geliştirme deneyiminiz köklü biçimde değişiyor.
